Skip to content

Commit

Permalink
⭐ implement StoreResults for resources data
Browse files Browse the repository at this point in the history
This extends the proto to store resources data directly. This is the next step in the data recordings journey that makes this data available to upstream targets. We are starting our journey in v10, so this is still behind a feature-flag for now.

Signed-off-by: Dominik Richter <dominik.richter@gmail.com>
  • Loading branch information
arlimus committed Jan 23, 2024
1 parent b120ab4 commit 2f1f904
Show file tree
Hide file tree
Showing 17 changed files with 656 additions and 465 deletions.
2 changes: 1 addition & 1 deletion apps/cnquery/cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *cnqueryPlugin) RunQuery(conf *run.RunQueryConfig, runtime *providers.Ru
}
defer func() {
// prevent the recording from being closed multiple times
connectAssetRuntime.Recording = providers.NullRecording{}
connectAssetRuntime.SetRecording(providers.NullRecording{})

Check failure on line 168 in apps/cnquery/cmd/plugin.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `connectAssetRuntime.SetRecording` is not checked (errcheck)
sh.Close()
}()

Expand Down
2 changes: 1 addition & 1 deletion apps/cnquery/cmd/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func RunScan(config *scanConfig) (*explorer.ReportCollection, error) {
opts = append(opts, scan.WithUpstream(config.runtime.UpstreamConfig))
}
if config.runtime.Recording != nil {

Check failure on line 311 in apps/cnquery/cmd/scan.go

View workflow job for this annotation

GitHub Actions / go-test

comparison of function Recording != nil is always true

Check failure on line 311 in apps/cnquery/cmd/scan.go

View workflow job for this annotation

GitHub Actions / go-test

comparison of function Recording != nil is always true
opts = append(opts, scan.WithRecording(config.runtime.Recording))
opts = append(opts, scan.WithRecording(config.runtime.Recording()))
}

scanner := scan.NewLocalScanner(opts...)
Expand Down
620 changes: 322 additions & 298 deletions explorer/cnquery_explorer.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion explorer/cnquery_explorer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ message UpdateAssetJobsReq {
// Store results for a given asset
message StoreResultsReq {
string asset_mrn = 1;
map<string, cnquery.llx.Result> data = 3;
map<string, cnquery.llx.Result> queryData = 3;
map<string, cnquery.llx.ResourceRecording> resources = 4;
}

// Retrieve data for a given set of entities which was previously stored
Expand Down
6 changes: 3 additions & 3 deletions explorer/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ func (e *instance) snapshotResults() map[string]*llx.Result {
return results
}

func (e *instance) StoreData() error {
func (e *instance) StoreQueryData() error {
if e.collector == nil {
return errors.New("cannot store data, no collector provided")
}

_, err := e.collector.StoreResults(context.Background(), &explorer.StoreResultsReq{
AssetMrn: e.assetMrn,
Data: e.snapshotResults(),
AssetMrn: e.assetMrn,
QueryData: e.snapshotResults(),
})

return err
Expand Down
2 changes: 1 addition & 1 deletion explorer/query_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func NewAssetMatchError(mrn string, objectType string, errorReason string, asset
}

func (s *LocalServices) StoreResults(ctx context.Context, req *StoreResultsReq) (*Empty, error) {
_, err := s.DataLake.UpdateData(ctx, req.AssetMrn, req.Data)
_, err := s.DataLake.UpdateData(ctx, req.AssetMrn, req.QueryData)
if err != nil {
return globalEmpty, err
}
Expand Down
30 changes: 25 additions & 5 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type LocalScanner struct {
ctx context.Context
fetcher *fetcher
upstream *upstream.UpstreamConfig
recording providers.Recording
recording llx.Recording
}

type ScannerOption func(*LocalScanner)
Expand All @@ -58,7 +58,7 @@ func WithUpstream(u *upstream.UpstreamConfig) func(s *LocalScanner) {
}
}

func WithRecording(r providers.Recording) func(s *LocalScanner) {
func WithRecording(r llx.Recording) func(s *LocalScanner) {
return func(s *LocalScanner) {
s.recording = r
}
Expand Down Expand Up @@ -267,14 +267,18 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
return nil, false, err
}
}
runtime.SetRecording(candidate.runtime.Recording)
err = runtime.SetRecording(candidate.runtime.Recording())
if err != nil {
log.Error().Err(err).Msg("unable to set recording for asset (pre-connect)")
continue
}

err = runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Asset: candidate.asset,
Upstream: upstream,
})
candidate.asset = runtime.Provider.Connection.Asset // to ensure we get all the information the connect call gave us

if err != nil {
log.Error().Err(err).Str("asset", candidate.asset.Name).Msg("unable to connect to asset")
continue
Expand Down Expand Up @@ -692,11 +696,27 @@ func (s *localAssetScanner) runQueryPack() (*AssetReport, error) {
return nil, err
}

err = e.StoreData()
err = e.StoreQueryData()
if err != nil {
return nil, err
}

if cnquery.GetFeatures(s.job.Ctx).IsActive(cnquery.StoreResourcesData) {
recording := s.Runtime.Recording()
data, ok := recording.GetAssetData(s.job.Asset.Mrn)
if !ok {
log.Debug().Msg("not storing resource data for this asset, nothing available")
} else {
_, err = conductor.StoreResults(context.Background(), &explorer.StoreResultsReq{
AssetMrn: s.job.Asset.Mrn,
Resources: data,
})
if err != nil {
return nil, err
}
}
}

ar := &AssetReport{
Mrn: s.job.Asset.Mrn,
Bundle: assetBundle,
Expand Down
6 changes: 6 additions & 0 deletions featureflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const (
// start: v8.x
// end: v9.0
ErrorsAsFailures

// StoreResourcesData feature flag
// desc: Stores recording-like data with upstream
// start: v10.x
// end: tbd (candidate: v11.0)
StoreResourcesData
)

// FeaturesValue is a map from feature name to feature flag
Expand Down
Loading

0 comments on commit 2f1f904

Please sign in to comment.