2
2
from pathlib import Path
3
3
from typing import Optional , Tuple , Union , List
4
4
from beastiary import crud , schemas
5
- from beastiary .models . trace import Trace
5
+ from beastiary .db . database import Database
6
6
from pydantic .utils import is_valid_field
7
- from sqlalchemy .orm .session import Session
8
- from beastiary .schemas .sample import SampleCreate
9
7
from beastiary .log import logger
10
8
import os , math , errno
11
9
@@ -39,45 +37,41 @@ def is_valid_log_file(headers_line: str, delimiter: Optional[str] = None) -> boo
39
37
return False
40
38
41
39
42
- def add_trace (db : Session , trace_in : schemas .TraceCreate ) -> Trace :
40
+ def add_trace (db : dict , trace_in : schemas .TraceCreate ) -> dict :
43
41
if not trace_in .path .is_file ():
44
42
raise FileNotFoundError (errno .ENOENT , os .strerror (errno .ENOENT ), trace_in .path )
45
43
last_byte , headers_line = get_headers (trace_in .path , delimiter = trace_in .delimiter )
46
44
if not is_valid_log_file (headers_line , delimiter = trace_in .delimiter ):
47
45
raise ValueError (f"Invalid log file: { trace_in .path } " )
48
46
logger .debug (f"Creating trace: { trace_in } " )
49
47
trace = crud .trace .create (
50
- db = db , obj_in = trace_in , headers_line = headers_line , last_byte = last_byte
48
+ db , obj_in = trace_in , last_byte = last_byte , headers_line = headers_line
51
49
)
52
50
logger .debug (f"Created trace: { trace } " )
53
51
return trace
54
52
55
53
56
- def read_lines (trace : Trace ) -> Tuple [int , list ]:
54
+ def read_lines (trace : schemas . Trace ) -> Tuple [int , list ]:
57
55
logger .debug (f"reading lines from: { trace } " )
58
- if not trace . path :
56
+ if not trace [ " path" ] :
59
57
raise ValueError ("Path must be set." )
60
- with open (trace . path , "r" ) as f :
61
- f .seek (trace . last_byte , 0 )
58
+ with open (trace [ " path" ] , "r" ) as f :
59
+ f .seek (trace [ " last_byte" ] , 0 )
62
60
lines = f .readlines ()
63
61
if lines :
64
62
last_byte = f .tell ()
65
63
else :
66
- last_byte = trace . last_byte
64
+ last_byte = trace [ " last_byte" ]
67
65
logger .debug (f"last_byte = { last_byte } " )
68
66
logger .debug (f"lines found = { len (lines )} " )
69
67
return last_byte , lines
70
68
71
69
72
70
def lines_to_SampleCreate (
73
- headers : list , lines : list , delimiter : Optional [str ] = None
74
- ) -> List [SampleCreate ]:
71
+ headers : list , lines : list , trace_id : int , delimiter : Optional [str ] = None
72
+ ) -> List [dict ]:
75
73
samples = []
76
74
for line in lines :
77
- if not line :
78
- # blank lines are bad...
79
- # you'll have to have a smart way to handel this with the byte offset
80
- raise ValueError ("Poorly formated line." )
81
75
data = {}
82
76
line = line .strip () # strip \n
83
77
for header , value in zip (headers , line .split (delimiter )):
@@ -90,18 +84,20 @@ def lines_to_SampleCreate(
90
84
sample_in = {}
91
85
sample_in ["data" ] = data
92
86
sample_in ["state" ] = data ["state" ]
93
- samples .append (schemas .sample .SampleCreate (** sample_in ))
87
+ sample_in ["trace_id" ] = trace_id
88
+ samples .append (sample_in )
94
89
return samples
95
90
96
91
97
- def check_for_new_samples (
98
- db : Session , trace : Trace , delimiter : Optional [str ] = None
99
- ) -> None :
92
+ def check_for_new_samples (db : Database , trace : schemas .Trace ) -> None :
100
93
last_byte , lines = read_lines (trace )
101
94
in_samples = lines_to_SampleCreate (
102
- trace .headers_line .split (delimiter ), lines , delimiter = trace .delimiter
95
+ trace ["headers_line" ].split (trace ["delimiter" ]),
96
+ lines ,
97
+ trace_id = trace ["id" ],
98
+ delimiter = trace ["delimiter" ],
103
99
)
104
100
if in_samples :
105
- crud .sample .create_multi_with_trace (db , objs_in = in_samples , trace_id = trace . id )
101
+ crud .sample .create_multi (db , objs_in = in_samples )
106
102
# update the trace byte
107
103
crud .trace .update (db , db_obj = trace , obj_in = {"last_byte" : last_byte })
0 commit comments