Skip to content

Commit

Permalink
Merge pull request #983 from neicnordic/feature/api-part-3
Browse files Browse the repository at this point in the history
Admin API part 3
  • Loading branch information
jbygdell authored Aug 14, 2024
2 parents 347acc6 + 1f1b0ec commit f071caf
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .github/integration/sda/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
log:
format: "json"
level: "debug"
admin:
users: "requester@demo.org"
archive:
type: s3
url: "http://s3"
Expand Down
10 changes: 10 additions & 0 deletions .github/integration/tests/sda/60_api_admin_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh
set -e

token="$(curl http://oidc:8080/tokens | jq -r '.[0]')"
result="$(curl -sk -L "http://api:8080/users/test@dummy.org/files" -H "Authorization: Bearer $token" | jq '. | length')"
if [ "$result" -ne 2 ]; then
echo "wrong number of files returned for user test@dummy.org"
echo "expected 4 got $result"
exit 1
fi
16 changes: 9 additions & 7 deletions sda/cmd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ func setup(config *config.Config) *http.Server {
r.GET("/ready", readinessResponse)
r.GET("/files", getFiles)
// admin endpoints below here
r.POST("/file/ingest", isAdmin(), ingestFile) // start ingestion of a file
r.POST("/file/accession", isAdmin(), setAccession) // assign accession ID to a file
r.POST("/dataset/create", isAdmin(), createDataset) // maps a set of files to a dataset
r.POST("/dataset/release/*dataset", isAdmin(), releaseDataset) // Releases a dataset to be accessible
r.GET("/users", isAdmin(), listActiveUsers) // Lists all users
r.GET("/users/:username/files", isAdmin(), listUserFiles) // Lists all unmapped files for a user
if len(config.API.Admins) > 0 {
r.POST("/file/ingest", isAdmin(), ingestFile) // start ingestion of a file
r.POST("/file/accession", isAdmin(), setAccession) // assign accession ID to a file
r.POST("/dataset/create", isAdmin(), createDataset) // maps a set of files to a dataset
r.POST("/dataset/release/*dataset", isAdmin(), releaseDataset) // Releases a dataset to be accessible
r.GET("/users", isAdmin(), listActiveUsers) // Lists all users
r.GET("/users/:username/files", isAdmin(), listUserFiles) // Lists all unmapped files for a user
}

cfg := &tls.Config{MinVersion: tls.VersionTLS12}

Expand Down Expand Up @@ -377,7 +379,7 @@ func listUserFiles(c *gin.Context) {
username = strings.TrimPrefix(username, "/")
username = strings.TrimSuffix(username, "/files")
log.Debugln(username)
files, err := Conf.API.DB.GetUserFiles(strings.ReplaceAll(username, "@", "_"))
files, err := Conf.API.DB.GetUserFiles(username)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())

Expand Down
62 changes: 38 additions & 24 deletions sda/cmd/api/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ Admin endpoints are only available to a set of whitelisted users specified in th
- accepts `POST` requests with JSON data with the format: `{"filepath": "</PATH/TO/FILE/IN/INBOX>", "user": "<USERNAME>"}`
- triggers the ingestion of the file.

- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload i.e. wrong `user` + `filepath` combination.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.
- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload i.e. wrong `user` + `filepath` combination.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.

Example:

Expand All @@ -45,11 +45,11 @@ Admin endpoints are only available to a set of whitelisted users specified in th
- accepts `POST` requests with JSON data with the format: `{"accession_id": "<FILE_ACCESSION>", "filepath": "</PATH/TO/FILE/IN/INBOX>", "user": "<USERNAME>"}`
- assigns accession ID to the file.

- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload i.e. wrong `user` + `filepath` combination.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.
- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload i.e. wrong `user` + `filepath` combination.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.

Example:

Expand Down Expand Up @@ -77,11 +77,11 @@ Admin endpoints are only available to a set of whitelisted users specified in th
- accepts `POST` requests with the dataset name as last part of the path`
- releases a dataset so that it can be downloaded.
- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.
- Error codes
- `200` Query execute ok.
- `400` Error due to bad payload.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB or MQ failures.
Example:
Expand All @@ -99,22 +99,36 @@ Admin endpoints are only available to a set of whitelisted users specified in th
curl -H "Authorization: Bearer $token" -X GET https://HOSTNAME/users
```

- Error codes
- `200` Query execute ok.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB failure.
- Error codes
- `200` Query execute ok.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB failure.

- `/users/:username/files`
- accepts `GET` requests`
- Returns all files for a user with active uploads as a JSON array
- Returns all files (that are not part of a dataset) for a user with active uploads as a JSON array
Example:
```bash
curl -H "Authorization: Bearer $token" -X GET https://HOSTNAME/users/submitter@example.org/files
```

- Error codes
- `200` Query execute ok.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB failure.
- Error codes
- `200` Query execute ok.
- `401` Token user is not in the list of admins.
- `500` Internal error due to DB failure.

#### Configure Admin users

The users that should have administrative access can be set in two ways:

- As a comma separated list of user identifiers assigned to: `admin.users`.
- As a JSON file containg a list of the user identities, the path to the file is assigned to: `admin.usersFile`. This is the recommended way.

```json
[
"foo-user@example.com",
"bar-user@example.com"
]
```
7 changes: 5 additions & 2 deletions sda/cmd/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path"
"runtime"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -206,7 +207,7 @@ func TestMain(m *testing.M) {
}

log.Println("starting tests")
_ = m.Run()
code := m.Run()

log.Println("tests completed")
if err := pool.Purge(postgres); err != nil {
Expand All @@ -218,6 +219,8 @@ func TestMain(m *testing.M) {
if err := pool.Purge(oidc); err != nil {
log.Fatalf("Could not purge resource: %s", err)
}

os.Exit(code)
}

type TestSuite struct {
Expand Down Expand Up @@ -1138,7 +1141,7 @@ func (suite *TestSuite) TestListUserFiles() {
testUsers := []string{"user_example.org", "User-B", "User-C"}
for _, user := range testUsers {
for i := 0; i < 5; i++ {
fileID, err := Conf.API.DB.RegisterFile(fmt.Sprintf("/%v/TestGetUserFiles-00%d.c4gh", user, i), user)
fileID, err := Conf.API.DB.RegisterFile(fmt.Sprintf("/%v/TestGetUserFiles-00%d.c4gh", user, i), strings.ReplaceAll(user, "_", "@"))
if err != nil {
suite.FailNow("failed to register file in database")
}
Expand Down
2 changes: 1 addition & 1 deletion sda/cmd/auth/oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func authenticateWithOidc(oauth2Config oauth2.Config, provider *oidc.Provider, c
func validateToken(rawJwt, jwksURL string) (*jwt.Token, string, error) {
set, err := jwk.Fetch(context.Background(), jwksURL)
if err != nil {
return nil, "", fmt.Errorf(err.Error())
return nil, "", fmt.Errorf("%s", err.Error())
}
for it := set.Keys(context.Background()); it.Next(context.Background()); {
pair := it.Pair()
Expand Down
2 changes: 1 addition & 1 deletion sda/cmd/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
log.Debugf("received a message: %s", delivered.Body)
schemaType, err := schemaFromDatasetOperation(delivered.Body)
if err != nil {
log.Errorf(err.Error())
log.Errorf("%s", err.Error())
if err := delivered.Ack(false); err != nil {
log.Errorf("failed to ack message: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion sda/cmd/orchestrate/orchestrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func processQueue(mq *broker.AMQPBroker, queue string, routingKey string, conf *
schemaType, err := schemaNameFromQueue(queue, delivered.Body, conf)

if err != nil {
log.Errorf(err.Error())
log.Error(err.Error())

if err := delivered.Ack(false); err != nil {
log.Errorf("failed to ack message: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion sda/cmd/s3inbox/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func reportError(errorCode int, message string, w http.ResponseWriter) {
return
}
// write the error message to the response
_, err = io.WriteString(w, string(xmlData))
_, err = io.Writer.Write(w, xmlData)
if err != nil {
// errors are logged but otherwised ignored
log.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion sda/cmd/s3inbox/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func startFakeServer(port string) *FakeServer {
f := FakeServer{}
foo := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
f.pinged = true
fmt.Fprintf(w, f.resp)
fmt.Fprint(w, f.resp)
})
ts := httptest.NewUnstartedServer(foo)
ts.Listener.Close()
Expand Down
4 changes: 3 additions & 1 deletion sda/cmd/s3inbox/s3inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestMain(m *testing.M) {
}

log.Println("starting tests")
_ = m.Run()
code := m.Run()

log.Println("tests completed")
if err := pool.Purge(postgres); err != nil {
Expand All @@ -193,4 +193,6 @@ func TestMain(m *testing.M) {
if err := pool.Purge(oidc); err != nil {
log.Fatalf("Could not purge resource: %s", err)
}

os.Exit(code)
}
4 changes: 3 additions & 1 deletion sda/cmd/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestMain(m *testing.M) {
}

log.Println("starting tests")
_ = m.Run()
code := m.Run()

log.Println("tests completed")
if err := pool.Purge(postgres); err != nil {
Expand All @@ -108,6 +108,8 @@ func TestMain(m *testing.M) {
if _, err := pool.Client.PruneVolumes(pvo); err != nil {
log.Fatalf("could not prune docker volumes: %s", err.Error())
}

os.Exit(code)
}

func (suite *SyncTest) SetupTest() {
Expand Down
4 changes: 3 additions & 1 deletion sda/cmd/syncapi/syncapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestMain(m *testing.M) {
}

log.Println("starting tests")
_ = m.Run()
code := m.Run()

log.Println("tests completed")
if err := pool.Purge(rabbitmq); err != nil {
Expand All @@ -101,6 +101,8 @@ func TestMain(m *testing.M) {
if _, err := pool.Client.PruneVolumes(pvo); err != nil {
log.Fatalf("could not prune docker volumes: %s", err.Error())
}

os.Exit(code)
}

func (suite *SyncAPITest) SetupTest() {
Expand Down
11 changes: 4 additions & 7 deletions sda/internal/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,9 @@ var tMqconf = MQConf{}

func TestMain(m *testing.M) {
certPath, _ = os.MkdirTemp("", "gocerts")
defer os.RemoveAll(certPath)
helper.MakeCerts(certPath)
_ = writeConf(certPath)

defer func() {
if r := recover(); r != nil {
log.Infoln("Recovered")
}
}()
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
if err != nil {
Expand Down Expand Up @@ -102,12 +96,15 @@ func TestMain(m *testing.M) {
log.Panicf("Could not connect to rabbitmq: %s", err)
}

_ = m.Run()
code := m.Run()

log.Println("tests completed")
if err := pool.Purge(rabbitmq); err != nil {
log.Panicf("Could not purge resource: %s", err)
}

os.RemoveAll(certPath)
os.Exit(code)
}

func (suite *BrokerTestSuite) SetupTest() {
Expand Down
17 changes: 17 additions & 0 deletions sda/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -467,6 +468,22 @@ func NewConfig(app string) (*Config, error) {
if err != nil {
return nil, err
}
if viper.IsSet("admin.usersFile") {
admins, err := os.ReadFile(viper.GetString("admin.usersFile"))
if err != nil {
return nil, err
}

if err := json.Unmarshal(admins, &c.API.Admins); err != nil {
return nil, err
}
}

// This is mainly for convenience when testing stuff
if viper.IsSet("admin.users") {
c.API.Admins = append(c.API.Admins, strings.Split(string(viper.GetString("admin.users")), ",")...)
}
c.configSchemas()
case "auth":
c.Auth.Cega.AuthURL = viper.GetString("auth.cega.authUrl")
c.Auth.Cega.ID = viper.GetString("auth.cega.id")
Expand Down
21 changes: 21 additions & 0 deletions sda/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,27 @@ func (suite *ConfigTestSuite) TestAPIConfiguration() {
assert.Equal(suite.T(), false, config.API.Session.Secure)
assert.Equal(suite.T(), "test", config.API.Session.Domain)
assert.Equal(suite.T(), 60*time.Second, config.API.Session.Expiration)

viper.Reset()
suite.SetupTest()
adminFile, err := os.CreateTemp("", "admins")
assert.NoError(suite.T(), err)
_, err = adminFile.Write([]byte(`["foo@example.com","bar@example.com","baz@example.com"]`))
assert.NoError(suite.T(), err)

viper.Set("admin.usersFile", adminFile.Name())
cFile, err := NewConfig("api")
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), []string{"foo@example.com", "bar@example.com", "baz@example.com"}, cFile.API.Admins)

os.Remove(adminFile.Name())

viper.Reset()
suite.SetupTest()
viper.Set("admin.users", "foo@bar.com,bar@foo.com")
cList, err := NewConfig("api")
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), []string{"foo@bar.com", "bar@foo.com"}, cList.API.Admins)
}

func (suite *ConfigTestSuite) TestNotifyConfiguration() {
Expand Down
4 changes: 3 additions & 1 deletion sda/internal/database/db_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func (dbs *SDAdb) getUserFiles(userID string) ([]*SubmissionFileInfo, error) {
files := []*SubmissionFileInfo{}
db := dbs.DB

// select all files of the user, each one annotated with its latest event
// select all files (that are not part of a dataset) of the user, each one annotated with its latest event
const query = "SELECT f.submission_file_path, e.event, f.created_at FROM sda.files f " +
"LEFT JOIN (SELECT DISTINCT ON (file_id) file_id, started_at, event FROM sda.file_event_log ORDER BY file_id, started_at DESC) e ON f.id = e.file_id WHERE f.submission_user = $1 " +
"AND f.id NOT IN (SELECT f.id FROM sda.files f RIGHT JOIN sda.file_dataset d ON f.id = d.file_id); "
Expand Down Expand Up @@ -662,6 +662,7 @@ func (dbs *SDAdb) getUserFiles(userID string) ([]*SubmissionFileInfo, error) {
return files, nil
}

// get the correlation ID for a user-inbox_path combination
func (dbs *SDAdb) GetCorrID(user, path string) (string, error) {
var (
corrID string
Expand Down Expand Up @@ -692,6 +693,7 @@ func (dbs *SDAdb) getCorrID(user, path string) (string, error) {
return corrID, nil
}

// list all users with files not yet assigned to a dataset
func (dbs *SDAdb) ListActiveUsers() ([]string, error) {
dbs.checkAndReconnectIfNeeded()
db := dbs.DB
Expand Down
Loading

0 comments on commit f071caf

Please sign in to comment.