13
13
// limitations under the License.
14
14
// -------------------------------------------------------------------------------------------------
15
15
16
- use log :: { error , info } ;
17
- use nautilus_infrastructure :: sql :: pg :: { connect_pg, get_postgres_connect_options} ;
18
- use sqlx :: PgPool ;
16
+ use nautilus_infrastructure :: sql :: pg :: {
17
+ connect_pg, drop_postgres , get_postgres_connect_options, init_postgres ,
18
+ } ;
19
19
20
20
use crate :: opt:: { DatabaseCommand , DatabaseOpt } ;
21
21
22
- /// Scans current path with keyword `nautilus_trader` and build schema dir
23
- fn get_schema_dir ( ) -> anyhow:: Result < String > {
24
- std:: env:: var ( "SCHEMA_DIR" ) . or_else ( |_| {
25
- let nautilus_git_repo_name = "nautilus_trader" ;
26
- let binding = std:: env:: current_dir ( ) . unwrap ( ) ;
27
- let current_dir = binding. to_str ( ) . unwrap ( ) ;
28
- match current_dir. find ( nautilus_git_repo_name) {
29
- Some ( index) => {
30
- let schema_path = current_dir[ 0 ..index + nautilus_git_repo_name. len ( ) ] . to_string ( ) + "/schema" ;
31
- Ok ( schema_path)
32
- }
33
- None => anyhow:: bail!( "Could not calculate schema dir from current directory path or SCHEMA_DIR env variable" )
34
- }
35
- } )
36
- }
37
-
38
- pub async fn init_postgres ( pg : & PgPool , database : String , password : String ) -> anyhow:: Result < ( ) > {
39
- info ! ( "Initializing Postgres database with target permissions and schema" ) ;
40
- // create public schema
41
- match sqlx:: query ( "CREATE SCHEMA IF NOT EXISTS public;" )
42
- . execute ( pg)
43
- . await
44
- {
45
- Ok ( _) => info ! ( "Schema public created successfully" ) ,
46
- Err ( err) => error ! ( "Error creating schema public: {:?}" , err) ,
47
- }
48
- // create role if not exists
49
- match sqlx:: query ( format ! ( "CREATE ROLE {database} PASSWORD '{password}' LOGIN;" ) . as_str ( ) )
50
- . execute ( pg)
51
- . await
52
- {
53
- Ok ( _) => info ! ( "Role {} created successfully" , database) ,
54
- Err ( err) => {
55
- if err. to_string ( ) . contains ( "already exists" ) {
56
- info ! ( "Role {} already exists" , database) ;
57
- } else {
58
- error ! ( "Error creating role {}: {:?}" , database, err) ;
59
- }
60
- }
61
- }
62
- // execute all the sql files in schema dir
63
- let schema_dir = get_schema_dir ( ) ?;
64
- let mut sql_files =
65
- std:: fs:: read_dir ( schema_dir) ?. collect :: < Result < Vec < _ > , std:: io:: Error > > ( ) ?;
66
- for file in & mut sql_files {
67
- let file_name = file. file_name ( ) ;
68
- info ! ( "Executing schema file: {:?}" , file_name) ;
69
- let file_path = file. path ( ) ;
70
- let sql_content = std:: fs:: read_to_string ( file_path. clone ( ) ) ?;
71
- for sql_statement in sql_content. split ( ';' ) . filter ( |s| !s. trim ( ) . is_empty ( ) ) {
72
- sqlx:: query ( sql_statement) . execute ( pg) . await ?;
73
- }
74
- }
75
- // grant connect
76
- match sqlx:: query ( format ! ( "GRANT CONNECT ON DATABASE {database} TO {database};" ) . as_str ( ) )
77
- . execute ( pg)
78
- . await
79
- {
80
- Ok ( _) => info ! ( "Connect privileges granted to role {}" , database) ,
81
- Err ( err) => error ! (
82
- "Error granting connect privileges to role {}: {:?}" ,
83
- database, err
84
- ) ,
85
- }
86
- // grant all schema privileges to the role
87
- match sqlx:: query ( format ! ( "GRANT ALL PRIVILEGES ON SCHEMA public TO {database};" ) . as_str ( ) )
88
- . execute ( pg)
89
- . await
90
- {
91
- Ok ( _) => info ! ( "All schema privileges granted to role {}" , database) ,
92
- Err ( err) => error ! (
93
- "Error granting all privileges to role {}: {:?}" ,
94
- database, err
95
- ) ,
96
- }
97
- // grant all table privileges to the role
98
- match sqlx:: query (
99
- format ! ( "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {database};" ) . as_str ( ) ,
100
- )
101
- . execute ( pg)
102
- . await
103
- {
104
- Ok ( _) => info ! ( "All tables privileges granted to role {}" , database) ,
105
- Err ( err) => error ! (
106
- "Error granting all privileges to role {}: {:?}" ,
107
- database, err
108
- ) ,
109
- }
110
- // grant all sequence privileges to the role
111
- match sqlx:: query (
112
- format ! ( "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {database};" ) . as_str ( ) ,
113
- )
114
- . execute ( pg)
115
- . await
116
- {
117
- Ok ( _) => info ! ( "All sequences privileges granted to role {}" , database) ,
118
- Err ( err) => error ! (
119
- "Error granting all privileges to role {}: {:?}" ,
120
- database, err
121
- ) ,
122
- }
123
- // grant all function privileges to the role
124
- match sqlx:: query (
125
- format ! ( "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {database};" ) . as_str ( ) ,
126
- )
127
- . execute ( pg)
128
- . await
129
- {
130
- Ok ( _) => info ! ( "All functions privileges granted to role {}" , database) ,
131
- Err ( err) => error ! (
132
- "Error granting all privileges to role {}: {:?}" ,
133
- database, err
134
- ) ,
135
- }
136
-
137
- Ok ( ( ) )
138
- }
139
-
140
- pub async fn drop_postgres ( pg : & PgPool , database : String ) -> anyhow:: Result < ( ) > {
141
- // execute drop owned
142
- match sqlx:: query ( format ! ( "DROP OWNED BY {database}" ) . as_str ( ) )
143
- . execute ( pg)
144
- . await
145
- {
146
- Ok ( _) => info ! ( "Dropped owned objects by role {}" , database) ,
147
- Err ( err) => error ! ( "Error dropping owned by role {}: {:?}" , database, err) ,
148
- }
149
- // revoke connect
150
- match sqlx:: query ( format ! ( "REVOKE CONNECT ON DATABASE {database} FROM {database};" ) . as_str ( ) )
151
- . execute ( pg)
152
- . await
153
- {
154
- Ok ( _) => info ! ( "Revoked connect privileges from role {}" , database) ,
155
- Err ( err) => error ! (
156
- "Error revoking connect privileges from role {}: {:?}" ,
157
- database, err
158
- ) ,
159
- }
160
- // revoke privileges
161
- match sqlx:: query (
162
- format ! ( "REVOKE ALL PRIVILEGES ON DATABASE {database} FROM {database};" ) . as_str ( ) ,
163
- )
164
- . execute ( pg)
165
- . await
166
- {
167
- Ok ( _) => info ! ( "Revoked all privileges from role {}" , database) ,
168
- Err ( err) => error ! (
169
- "Error revoking all privileges from role {}: {:?}" ,
170
- database, err
171
- ) ,
172
- }
173
- // execute drop schema
174
- match sqlx:: query ( "DROP SCHEMA IF EXISTS public CASCADE" )
175
- . execute ( pg)
176
- . await
177
- {
178
- Ok ( _) => info ! ( "Dropped schema public" ) ,
179
- Err ( err) => error ! ( "Error dropping schema public: {:?}" , err) ,
180
- }
181
- // drop role
182
- match sqlx:: query ( format ! ( "DROP ROLE IF EXISTS {database};" ) . as_str ( ) )
183
- . execute ( pg)
184
- . await
185
- {
186
- Ok ( _) => info ! ( "Dropped role {}" , database) ,
187
- Err ( err) => error ! ( "Error dropping role {}: {:?}" , database, err) ,
188
- }
189
- Ok ( ( ) )
190
- }
191
-
192
22
pub async fn run_database_command ( opt : DatabaseOpt ) -> anyhow:: Result < ( ) > {
193
23
let command = opt. command . clone ( ) ;
194
24
@@ -207,6 +37,7 @@ pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
207
37
& pg,
208
38
pg_connect_options. database ,
209
39
pg_connect_options. password ,
40
+ config. schema ,
210
41
)
211
42
. await ?;
212
43
}
0 commit comments