Skip to content

Commit

Permalink
Batching (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
milandjurdjevic authored Feb 16, 2025
1 parent fc8d7f7 commit 4b1be40
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 61 deletions.
4 changes: 2 additions & 2 deletions cli/src/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ let import files =
let parse pattern text =
pattern
|> Option.map Grok.create
|> Option.defaultValue (Grok.pattern |> Result.Ok)
|> Option.defaultValue (Ok(Grok.pattern))
|> Result.bind (Grok.extract text)
|> Result.map (Grok.group >> Grok.transform)

let filter expression entries =
expression
|> Option.map (fun exp -> Parser.expression |> Parser.parse exp)
|> Option.map (fun exp -> Parser.filterExpression |> Parser.parse exp)
|> Option.map (fun res ->
res
|> Result.bind (fun filter -> entries |> Result.map (fun entries -> filter, entries)))
Expand Down
2 changes: 2 additions & 0 deletions core/src/Analog.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
<Compile Include="Filter.fs" />
<Compile Include="Parser.fs" />
<Compile Include="Grok.fs" />
<Compile Include="Batch.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FParsec" Version="1.1.1" />
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />
<PackageReference Include="grok.net" Version="2.0.0" />
<PackageReference Include="PCRE.NET" Version="1.1.0" />
</ItemGroup>
Expand Down
61 changes: 61 additions & 0 deletions core/src/Batch.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module Analog.Core.Batch

open System
open System.IO
open System.Text.RegularExpressions
open System.Diagnostics.CodeAnalysis
open FSharp.Control

type Capture = string -> string seq * string

let capture ([<StringSyntax("regex")>] regex) : Capture =

let regex =
Regex(regex, RegexOptions.Multiline ||| RegexOptions.Compiled, TimeSpan.FromSeconds(int64 5))

fun input ->
let entries, index =
((List.empty<string>, 0), regex.Matches input)
||> Seq.fold (fun (list, index) item ->
let entry = input[index .. item.Index - 1].Trim()
(if entry.Length > 0 then list @ [ entry ] else list), item.Index)

match entries with
| [] -> [], input
| entries -> entries, input[index..]

let sync (stream: Stream) (capture: Capture) : string seq =
seq {
use reader = new StreamReader(stream)
let mutable leftover = String.Empty

while not reader.EndOfStream do
let buffer = Array.zeroCreate<char> 1024
let block = reader.ReadBlock buffer
let cap = leftover + (buffer |> Array.take block |> String.Concat) |> capture
leftover <- snd cap
yield! fst cap

yield!
match leftover.Trim() with
| "" -> Seq.empty
| leftover -> Seq.singleton leftover
}

let async (stream: Stream) (capture: Capture) =
taskSeq {
use reader = new StreamReader(stream)
let mutable leftover = String.Empty

while not reader.EndOfStream do
let buffer = Array.zeroCreate<char> 1024
let! block = reader.ReadBlockAsync(Memory(buffer))
let cap = leftover + (buffer |> Array.take block |> String.Concat) |> capture
leftover <- snd cap
yield! fst cap

yield!
match leftover.Trim() with
| "" -> Seq.empty
| leftover -> Seq.singleton leftover
}
27 changes: 12 additions & 15 deletions core/src/Filter.fs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
module Analog.Core.Filter

type private LogLiteral = Log.Literal
type private LogEntry = Log.Entry

type Operator =
| EqualOperator
| NotEqualOperator
Expand All @@ -14,35 +11,35 @@ type Operator =
| OrOperator

type Expression =
| LiteralExpression of LogLiteral
| LiteralExpression of Log.Literal
| MemberExpression of string
| BinaryExpression of Expression * Operator * Expression

type private Evaluation =
| TemporaryEvaluation of LogLiteral option
type Evaluation =
| TemporaryEvaluation of Log.Literal option
| FinalEvaluation of bool

let eval expression entry =
let compareLiteral (left: LogLiteral option) (right: LogLiteral option) comparer =
let compareLiteral (left: Log.Literal option) (right: Log.Literal option) comparer =
match left, right with
| Some left, Some right ->
match left, right with
| LogLiteral.StringLiteral _, LogLiteral.StringLiteral _ -> comparer left right
| LogLiteral.NumberLiteral _, LogLiteral.NumberLiteral _ -> comparer left right
| LogLiteral.BooleanLiteral _, LogLiteral.BooleanLiteral _ -> comparer left right
| LogLiteral.TimestampLiteral _, LogLiteral.TimestampLiteral _ -> comparer left right
| Log.StringLiteral _, Log.StringLiteral _ -> comparer left right
| Log.NumberLiteral _, Log.NumberLiteral _ -> comparer left right
| Log.BooleanLiteral _, Log.BooleanLiteral _ -> comparer left right
| Log.TimestampLiteral _, Log.TimestampLiteral _ -> comparer left right
| _ -> false
| _ -> false

let combineLiteral (left: LogLiteral option) (right: LogLiteral option) combiner =
let combineLiteral (left: Log.Literal option) (right: Log.Literal option) combiner =
match left, right with
| Some left, Some right ->
match left, right with
| LogLiteral.BooleanLiteral left, LogLiteral.BooleanLiteral right -> combiner left right
| Log.BooleanLiteral left, Log.BooleanLiteral right -> combiner left right
| _ -> false
| _ -> false

let wrapFinal = LogLiteral.BooleanLiteral >> Some
let wrapFinal = Log.BooleanLiteral >> Some

let compareEvaluation (left: Evaluation) (right: Evaluation) comparer =
match left, right with
Expand All @@ -69,7 +66,7 @@ let eval expression entry =
| AndOperator -> combineEvaluation left right (&&)
| OrOperator -> combineEvaluation left right (||)

let rec loop (expression: Expression) (entry: LogEntry) : Evaluation =
let rec loop (expression: Expression) (entry: Log.Entry) : Evaluation =
match expression with
| LiteralExpression right -> right |> Option.Some |> TemporaryEvaluation
| MemberExpression field -> entry |> Log.literal field |> TemporaryEvaluation
Expand Down
35 changes: 18 additions & 17 deletions core/src/Grok.fs
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
[<RequireQualifiedAccess>]
module Analog.Core.Grok

open GrokNet

type private LogEntry = Log.Entry

type Pattern = Pattern of Grok
type Extract = Extract of GrokResult
type Group = Group of Map<string, string>

let create (pattern: string) : Result<Grok, string> =
let pattern: Pattern =
Grok("\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:message}")
|> Pattern

let create (pattern: string) : Result<Pattern, string> =
try
Grok(pattern) |> Result.Ok
Grok(pattern) |> Pattern |> Result.Ok
with err ->
$"Grok initialization failed with error: {err.Message}" |> Result.Error

let extract (text: string) (pattern: Grok) : Result<GrokResult, string> =
let extract (text: string) (Pattern pattern) : Result<Extract, string> =
try
pattern.Parse text |> Result.Ok
pattern.Parse text |> Extract |> Result.Ok
with err ->
$"Grok extraction failed with error: {err.Message}" |> Result.Error

let pattern: Grok =
Grok("\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:message}")

let group: GrokResult -> Group list =
Seq.fold
let group (Extract extract) =
extract
|> Seq.fold
(fun list item ->
match list with
| [] -> [ Map([ item.Key, string item.Value ]) ]
Expand All @@ -33,16 +34,16 @@ let group: GrokResult -> Group list =
else
(head |> Map.add item.Key (string item.Value)) :: tail)
List.empty
>> List.rev
>> List.map Group
|> List.rev
|> List.map Group

let transform: Group list -> LogEntry list =
let transform: Group list -> Log.Entry list =
List.map (fun (Group group) ->
group
|> Map.toSeq
|> Seq.choose (fun (key, value) ->
match Parser.literal |> Parser.parse value with
match Parser.logLiteral |> Parser.parse value with
| Ok value -> Some(key, value)
| Error _ -> None)
|> Map.ofSeq
|> LogEntry.Entry)
|> Log.Entry)
1 change: 1 addition & 0 deletions core/src/Log.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type Literal =
type Entry = Entry of Map<string, Literal>

let literal key (Entry entry) = entry |> Map.tryFind key

let empty = Map.empty |> Entry
48 changes: 22 additions & 26 deletions core/src/Parser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ module Analog.Core.Parser
open System
open FParsec

type private LogLiteral = Log.Literal
type private FilterExpression = Filter.Expression
type private FilterOperator = Filter.Operator

let parse input parser =
run parser input
|> function
Expand Down Expand Up @@ -37,38 +33,38 @@ let boolCI: Parser<_, unit> =
let stringQuoted: Parser<string, unit> =
skipChar '\'' >>. manyCharsTill anyChar (skipChar '\'')

let literal: Parser<_, unit> =
let logLiteral: Parser<_, unit> =
choice
[ dateTimeOffset |>> LogLiteral.TimestampLiteral
floatFinite |>> LogLiteral.NumberLiteral
boolCI |>> LogLiteral.BooleanLiteral
restOfLine true |>> LogLiteral.StringLiteral ]
[ dateTimeOffset |>> Log.TimestampLiteral
floatFinite |>> Log.NumberLiteral
boolCI |>> Log.BooleanLiteral
restOfLine true |>> Log.StringLiteral ]

let expression: Parser<_, unit> =
let filterExpression: Parser<_, unit> =
let literalExpression: Parser<_, unit> =
choice
[ dateTimeOffset |>> LogLiteral.TimestampLiteral .>> spaces
floatFinite |>> LogLiteral.NumberLiteral .>> spaces
boolCI |>> LogLiteral.BooleanLiteral .>> spaces
stringQuoted |>> LogLiteral.StringLiteral .>> spaces ]
|>> FilterExpression.LiteralExpression
[ dateTimeOffset |>> Log.TimestampLiteral .>> spaces
floatFinite |>> Log.NumberLiteral .>> spaces
boolCI |>> Log.BooleanLiteral .>> spaces
stringQuoted |>> Log.StringLiteral .>> spaces ]
|>> Filter.Expression.LiteralExpression

let memberExpression: Parser<_, unit> =
many1Chars (letter <|> digit) |>> FilterExpression.MemberExpression .>> spaces
many1Chars (letter <|> digit) |>> Filter.Expression.MemberExpression .>> spaces

let operator = OperatorPrecedenceParser<FilterExpression, _, _>()
let operator = OperatorPrecedenceParser<Filter.Expression, _, _>()
operator.TermParser <- choice [ literalExpression; memberExpression ]

let bin op left right =
FilterExpression.BinaryExpression(left, op, right)
Filter.Expression.BinaryExpression(left, op, right)

let add = operator.AddOperator
add (InfixOperator("&", spaces, 1, Associativity.Left, bin FilterOperator.AndOperator))
add (InfixOperator("|", spaces, 2, Associativity.Left, bin FilterOperator.OrOperator))
add (InfixOperator(">", spaces, 3, Associativity.None, bin FilterOperator.GreaterThanOperator))
add (InfixOperator(">=", spaces, 4, Associativity.None, bin FilterOperator.GreaterThanOrEqualOperator))
add (InfixOperator("<", spaces, 5, Associativity.None, bin FilterOperator.LessThanOperator))
add (InfixOperator("<=", spaces, 6, Associativity.None, bin FilterOperator.LessThanOrEqualOperator))
add (InfixOperator("=", spaces, 7, Associativity.None, bin FilterOperator.EqualOperator))
add (InfixOperator("<>", spaces, 8, Associativity.None, bin FilterOperator.NotEqualOperator))
add (InfixOperator("&", spaces, 1, Associativity.Left, bin Filter.AndOperator))
add (InfixOperator("|", spaces, 2, Associativity.Left, bin Filter.OrOperator))
add (InfixOperator(">", spaces, 3, Associativity.None, bin Filter.GreaterThanOperator))
add (InfixOperator(">=", spaces, 4, Associativity.None, bin Filter.GreaterThanOrEqualOperator))
add (InfixOperator("<", spaces, 5, Associativity.None, bin Filter.LessThanOperator))
add (InfixOperator("<=", spaces, 6, Associativity.None, bin Filter.LessThanOrEqualOperator))
add (InfixOperator("=", spaces, 7, Associativity.None, bin Filter.EqualOperator))
add (InfixOperator("<>", spaces, 8, Associativity.None, bin Filter.NotEqualOperator))
operator.ExpressionParser
1 change: 1 addition & 0 deletions core/test/Analog.Core.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<Compile Include="GrokTest.fs" />
<Compile Include="ParserTest.fs" />
<Compile Include="FilterTest.fs" />
<Compile Include="BatchTest.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

Expand Down
70 changes: 70 additions & 0 deletions core/test/BatchTest.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
module Analog.Core.Tests.BatchTest

open System.IO
open System.Text
open Analog.Core
open FSharp.Control
open Swensen.Unquote
open Xunit

let input =
"""
[2018-10-15 15:38:23.184 +02:00] [INF] [Topshelf] Started
[2018-10-15 15:40:55.598 +02:00] [INF] [Topshelf] Stopping
[2018-10-15 15:40:55.599 +02:00] [INF] MAUTO Windows Service stopping
[2018-10-15 15:40:55.599 +02:00] [INF] Cron scheduler stopping
[2018-10-15 15:40:55.601 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED shutting down.
[2018-10-15 15:40:55.601 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED paused.
[2018-10-15 15:40:55.604 +02:00] [DBG] Shutting down threadpool...
[2018-10-15 15:40:55.604 +02:00] [DBG] Shutdown of threadpool complete.
[2018-10-15 15:40:55.604 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED Shutdown complete.
[2018-10-15 15:40:55.604 +02:00] [INF] Cron scheduler stopped
[2018-10-15 15:40:55.604 +02:00] [INF] MAUTO Windows Service stopped
[2018-10-15 15:40:55.606 +02:00] [INF] Cron scheduler stopping
[2018-10-15 15:40:55.606 +02:00] [INF] Cron scheduler stopped
[2018-10-15 15:40:55.660 +02:00] [INF] [Topshelf] Stopped
[2018-10-15 15:40:55.953 +02:00] [DBG] WorkerThread is shut down
"""
|> Encoding.UTF8.GetBytes

let expected =
[ "[2018-10-15 15:38:23.184 +02:00] [INF] [Topshelf] Started"
"[2018-10-15 15:40:55.598 +02:00] [INF] [Topshelf] Stopping"
"[2018-10-15 15:40:55.599 +02:00] [INF] MAUTO Windows Service stopping"
"[2018-10-15 15:40:55.599 +02:00] [INF] Cron scheduler stopping"
"[2018-10-15 15:40:55.601 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED shutting down."
"[2018-10-15 15:40:55.601 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED paused."
"[2018-10-15 15:40:55.604 +02:00] [DBG] Shutting down threadpool..."
"[2018-10-15 15:40:55.604 +02:00] [DBG] Shutdown of threadpool complete."
"[2018-10-15 15:40:55.604 +02:00] [INF] Scheduler QuartzScheduler_$_NON_CLUSTERED Shutdown complete."
"[2018-10-15 15:40:55.604 +02:00] [INF] Cron scheduler stopped"
"[2018-10-15 15:40:55.604 +02:00] [INF] MAUTO Windows Service stopped"
"[2018-10-15 15:40:55.606 +02:00] [INF] Cron scheduler stopping"
"[2018-10-15 15:40:55.606 +02:00] [INF] Cron scheduler stopped"
"[2018-10-15 15:40:55.660 +02:00] [INF] [Topshelf] Stopped"
"[2018-10-15 15:40:55.953 +02:00] [DBG] WorkerThread is shut down" ]

[<Fact>]
let ``load stream as sync sequence of strings chopped by regex`` () =
use stream = new MemoryStream(input)


let actual =
Batch.capture "\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} [+-]\d{2}:\d{2}\]"
|> Batch.sync stream
|> Seq.toList

test <@ actual = expected @>

[<Fact>]
let ``load stream as async sequence of strings chopped by regex`` () =
task {
use stream = new MemoryStream(input)

let! actual =
Batch.capture "\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} [+-]\d{2}:\d{2}\]"
|> Batch.async stream
|> TaskSeq.toListAsync

test <@ actual = expected @>
}
2 changes: 1 addition & 1 deletion core/test/ParserTest.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open Analog.Core
open Filter
open Log

let parse text = Parser.expression |> Parser.parse text
let parse text = Parser.filterExpression |> Parser.parse text

[<Fact>]
let ``parse should correctly parse a constant string`` () =
Expand Down

0 comments on commit 4b1be40

Please sign in to comment.