Skip to content

Commit

Permalink
Heartbeat activities
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnMwashuma committed Apr 12, 2024
1 parent e1be08d commit 5c25fc0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
13 changes: 13 additions & 0 deletions activities/geoparquet_to_geojson.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/onaio/akuko-temporal-go-tooling/internal/geojson"
"github.com/onaio/akuko-temporal-go-tooling/internal/geoparquet"
"go.temporal.io/sdk/activity"
)

// ReadFileBytes reads a file from a filepath and returns its contents as a byte slice
Expand Down Expand Up @@ -53,12 +54,18 @@ type ConvertGeoParquetToGeoJSONActivityReturnType struct {
}

func ConvertGeoParquetToGeoJSONActivity(ctx context.Context, params *ConvertGeoParquetToGeoJSONActivityParams) (*ConvertGeoParquetToGeoJSONActivityReturnType, error) {
message := "Reading geoparquet file bytes"
fmt.Println(message)
activity.RecordHeartbeat(ctx, message)
fileBytes, err := ReadFileBytes(params.GeoParquetFilePath)
if err != nil {
fmt.Println("Error:", err)
return nil, err
}

message = "Converting geoparquet to geojson"
fmt.Println(message)
activity.RecordHeartbeat(ctx, message)
// Convert from GeoParquet to GeoJSON
geoJSONBuffer := &bytes.Buffer{}
err = geojson.FromParquet(bytes.NewReader(fileBytes), geoJSONBuffer)
Expand All @@ -74,6 +81,9 @@ func ConvertGeoParquetToGeoJSONActivity(ctx context.Context, params *ConvertGeoP
}
defer reader.Close()

message = "Getting geoparquet metadata"
fmt.Println(message)
activity.RecordHeartbeat(ctx, message)
metadata, metadataErr := geoparquet.GetMetadata(reader.MetaData().GetKeyValueMetadata())
if metadataErr != nil {
fmt.Println("Error converting from Parquet to GeoJSON: %v", metadataErr)
Expand All @@ -82,6 +92,9 @@ func ConvertGeoParquetToGeoJSONActivity(ctx context.Context, params *ConvertGeoP

fmt.Println("MetaData: %s", &metadata)

message = "Writting geojson data to disk"
fmt.Println(message)
activity.RecordHeartbeat(ctx, message)
err = WriteFileBytes(params.GeoJSONFilePath, geoJSONBuffer.Bytes())
if err != nil {
fmt.Println("Error writing file to disk: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions activities/sanitize_geojson_feature_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io/ioutil"
"regexp"
"strings"

"go.temporal.io/sdk/activity"
)

// sanitizeStringToCubeSyntax sanitizes a string to conform to Cube syntax rules.
Expand Down Expand Up @@ -96,6 +98,8 @@ type SanitizeGeoJSONFeaturePropertiesActivityReturnType struct {
}

func SanitizeGeoJSONFeaturePropertiesActivity(ctx context.Context, params *SanitizeGeoJSONFeaturePropertiesActivityParams) (*SanitizeGeoJSONFeaturePropertiesActivityReturnType, error) {
message := "Sanitizing geojson file"
activity.RecordHeartbeat(ctx, message)
inputFile := params.GeoJSONFilePath
outputFile := params.GeoJSONFilePath

Expand All @@ -105,6 +109,8 @@ func SanitizeGeoJSONFeaturePropertiesActivity(ctx context.Context, params *Sanit
return nil, err
}

message = "GeoJSON file sanitized successfully."
activity.RecordHeartbeat(ctx, message)
fmt.Println("GeoJSON file sanitized successfully.")
var data = SanitizeGeoJSONFeaturePropertiesActivityReturnType{
FilePath: outputFile,
Expand Down

0 comments on commit 5c25fc0

Please sign in to comment.