-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathdatasources.py
90 lines (68 loc) · 2.46 KB
/
datasources.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import List, Optional, Union
from pydantic import BaseModel, Field
import minds.exceptions as exc
class DatabaseConfig(BaseModel):
name: str
engine: str
description: str
connection_data: Union[dict, None] = {}
tables: Union[List[str], None] = []
class Datasource(DatabaseConfig):
...
class Datasources:
def __init__(self, client):
self.api = client.api
def create(self, ds_config: DatabaseConfig, replace=False):
"""
Create new datasource and return it
:param ds_config: datasource configuration, properties:
- name: str, name of datatasource
- engine: str, type of database handler, for example 'postgres', 'mysql', ...
- description: str, description of the database. Used by mind to know what data can be got from it.
- connection_data: dict, optional, credentials to connect to database
- tables: list of str, optional, list of allowed tables
:return: datasource object
"""
name = ds_config.name
if replace:
try:
self.get(name)
self.drop(name, force=True)
except exc.ObjectNotFound:
...
self.api.post('/datasources', data=ds_config.model_dump())
return self.get(name)
def list(self) -> List[Datasource]:
"""
Returns list of datasources
:return: iterable datasources
"""
data = self.api.get('/datasources').json()
ds_list = []
for item in data:
# TODO skip not sql skills
if item.get('engine') is None:
continue
ds_list.append(Datasource(**item))
return ds_list
def get(self, name: str) -> Datasource:
"""
Get datasource by name
:param name: name of datasource
:return: datasource object
"""
data = self.api.get(f'/datasources/{name}').json()
# TODO skip not sql skills
if data.get('engine') is None:
raise exc.ObjectNotSupported(f'Wrong type of datasource: {name}')
return Datasource(**data)
def drop(self, name: str, force=False):
"""
Drop datasource by name
:param name: name of datasource
:param force: if True - remove from all minds, default: False
"""
data = None
if force:
data = {'cascade': True}
self.api.delete(f'/datasources/{name}', data=data)