Skip to content

Commit

Permalink
Merge pull request #1356 from pweil-/operator-refactor
Browse files Browse the repository at this point in the history
Operator refactor - validation and import cleanup
  • Loading branch information
openshift-merge-robot authored Aug 13, 2020
2 parents 339382e + 74d6ce9 commit 9b659da
Show file tree
Hide file tree
Showing 15 changed files with 768 additions and 246 deletions.
4 changes: 2 additions & 2 deletions pkg/operator/aws_usage_hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
)

// CreateAWSUsageTable instantiates a new external HiveTable CR for AWS Billing/Usage reports stored in S3.
func (op *Reporting) createAWSUsageHiveTableCR(logger logrus.FieldLogger, dataSource *metering.ReportDataSource, tableName, bucket, prefix string, manifests []*aws.Manifest) (*metering.HiveTable, error) {
func (op *defaultReportingOperator) createAWSUsageHiveTableCR(logger logrus.FieldLogger, dataSource *metering.ReportDataSource, tableName, bucket, prefix string, manifests []*aws.Manifest) (*metering.HiveTable, error) {
location, err := hive.S3Location(bucket, prefix)
if err != nil {
return nil, err
Expand Down Expand Up @@ -118,7 +118,7 @@ func (op *Reporting) createAWSUsageHiveTableCR(logger logrus.FieldLogger, dataSo
return hiveTable, nil
}

func (op *Reporting) updateAWSBillingPartitions(logger log.FieldLogger, dataSource *metering.ReportDataSource, source *metering.S3Bucket, hiveTable *metering.HiveTable, manifests []*aws.Manifest) error {
func (op *defaultReportingOperator) updateAWSBillingPartitions(logger log.FieldLogger, dataSource *metering.ReportDataSource, source *metering.S3Bucket, hiveTable *metering.HiveTable, manifests []*aws.Manifest) error {
logger.Infof("updating partitions for Hive table %s", hiveTable.Name)
// Fetch the billing manifests
if len(manifests) == 0 {
Expand Down
30 changes: 15 additions & 15 deletions pkg/operator/datasources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

metering "github.com/kube-reporting/metering-operator/pkg/apis/metering/v1"
"github.com/kube-reporting/metering-operator/pkg/aws"
cbInterfaces "github.com/kube-reporting/metering-operator/pkg/generated/clientset/versioned/typed/metering/v1"
clientset "github.com/kube-reporting/metering-operator/pkg/generated/clientset/versioned/typed/metering/v1"
"github.com/kube-reporting/metering-operator/pkg/hive"
"github.com/kube-reporting/metering-operator/pkg/operator/prestostore"
"github.com/kube-reporting/metering-operator/pkg/operator/reporting"
Expand All @@ -34,15 +34,15 @@ const (
expectedArrSplitElementsFQTN = 3
)

func (op *Reporting) runReportDataSourceWorker() {
func (op *defaultReportingOperator) runReportDataSourceWorker() {
logger := op.logger.WithField("component", "reportDataSourceWorker")
logger.Infof("ReportDataSource worker started")
const maxRequeues = 20
for op.processResource(logger, op.syncReportDataSource, "ReportDataSource", op.reportDataSourceQueue, maxRequeues) {
}
}

func (op *Reporting) syncReportDataSource(logger log.FieldLogger, key string) error {
func (op *defaultReportingOperator) syncReportDataSource(logger log.FieldLogger, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
logger.WithError(err).Errorf("invalid resource key :%s", key)
Expand Down Expand Up @@ -70,7 +70,7 @@ func (op *Reporting) syncReportDataSource(logger log.FieldLogger, key string) er
return op.handleReportDataSource(logger, ds)
}

func (op *Reporting) handleReportDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handleReportDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
if op.cfg.EnableFinalizers && reportDataSourceNeedsFinalizer(dataSource) {
var err error
dataSource, err = op.addReportDataSourceFinalizer(dataSource)
Expand Down Expand Up @@ -98,7 +98,7 @@ func (op *Reporting) handleReportDataSource(logger log.FieldLogger, dataSource *

}

func (op *Reporting) handlePrometheusMetricsDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handlePrometheusMetricsDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
if dataSource.Spec.PrometheusMetricsImporter == nil {
return fmt.Errorf("%s is not a PrometheusMetricsImporter ReportDataSource", dataSource.Name)
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (op *Reporting) handlePrometheusMetricsDataSource(logger log.FieldLogger, d
return nil
}

func (op *Reporting) handleAWSBillingDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handleAWSBillingDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
source := dataSource.Spec.AWSBilling.Source
if source == nil {
return fmt.Errorf("ReportDataSource %q: improperly configured datasource, source is empty", dataSource.Name)
Expand Down Expand Up @@ -410,7 +410,7 @@ func (op *Reporting) handleAWSBillingDataSource(logger log.FieldLogger, dataSour
return nil
}

func (op *Reporting) handlePrestoTableDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handlePrestoTableDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
if dataSource.Spec.PrestoTable == nil {
return fmt.Errorf("%s is not a PrestoTable ReportDataSource", dataSource.Name)
}
Expand Down Expand Up @@ -457,7 +457,7 @@ func (op *Reporting) handlePrestoTableDataSource(logger log.FieldLogger, dataSou
// processed this resource before, query the Presto table's metadata and then create an unmanaged
// PrestoTable custom resource with the columns returned from the query. Once the PrestoTable resource
// has been created, update the @dataSource Status field to refer to the name of the created PrestoTable.
func (op *Reporting) handleLinkExistingTable(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handleLinkExistingTable(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
if dataSource.Spec.LinkExistingTable.TableName == "" {
return fmt.Errorf("invalid configuration passed: spec.linkExistingTable.tableName field cannot be empty")
}
Expand Down Expand Up @@ -520,7 +520,7 @@ func (op *Reporting) handleLinkExistingTable(logger log.FieldLogger, dataSource
return nil
}

func (op *Reporting) handleReportQueryViewDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) handleReportQueryViewDataSource(logger log.FieldLogger, dataSource *metering.ReportDataSource) error {
if dataSource.Spec.ReportQueryView == nil {
return fmt.Errorf("%s is not a ReportQueryView ReportDataSource", dataSource.Name)
}
Expand Down Expand Up @@ -647,7 +647,7 @@ func (op *Reporting) handleReportQueryViewDataSource(logger log.FieldLogger, dat
return nil
}

func (op *Reporting) addReportDataSourceFinalizer(ds *metering.ReportDataSource) (*metering.ReportDataSource, error) {
func (op *defaultReportingOperator) addReportDataSourceFinalizer(ds *metering.ReportDataSource) (*metering.ReportDataSource, error) {
ds.Finalizers = append(ds.Finalizers, reportDataSourceFinalizer)
newReportDataSource, err := op.meteringClient.MeteringV1().ReportDataSources(ds.Namespace).Update(context.TODO(), ds, metav1.UpdateOptions{})
logger := op.logger.WithFields(log.Fields{"reportDataSource": ds.Name, "namespace": ds.Namespace})
Expand All @@ -659,7 +659,7 @@ func (op *Reporting) addReportDataSourceFinalizer(ds *metering.ReportDataSource)
return newReportDataSource, nil
}

func (op *Reporting) removeReportDataSourceFinalizer(ds *metering.ReportDataSource) (*metering.ReportDataSource, error) {
func (op *defaultReportingOperator) removeReportDataSourceFinalizer(ds *metering.ReportDataSource) (*metering.ReportDataSource, error) {
if !slice.ContainsString(ds.ObjectMeta.Finalizers, reportDataSourceFinalizer, nil) {
return ds, nil
}
Expand All @@ -678,7 +678,7 @@ func reportDataSourceNeedsFinalizer(ds *metering.ReportDataSource) bool {
return ds.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(ds.ObjectMeta.Finalizers, reportDataSourceFinalizer, nil)
}

func (op *Reporting) getQueryDependencies(namespace, name string, inputVals []metering.ReportQueryInputValue) (*reporting.ReportQueryDependencies, error) {
func (op *defaultReportingOperator) getQueryDependencies(namespace, name string, inputVals []metering.ReportQueryInputValue) (*reporting.ReportQueryDependencies, error) {
queryGetter := reporting.NewReportQueryListerGetter(op.reportQueryLister)
query, err := queryGetter.GetReportQuery(namespace, name)
if err != nil {
Expand All @@ -691,7 +691,7 @@ func (op *Reporting) getQueryDependencies(namespace, name string, inputVals []me
return result.Dependencies, nil
}

func (op *Reporting) queueDependentReportDataSourcesForDataSource(dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) queueDependentReportDataSourcesForDataSource(dataSource *metering.ReportDataSource) error {
// Look at reportDataSources in the namespace of this dataSource
reportDataSources, err := op.reportDataSourceLister.ReportDataSources(dataSource.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -724,7 +724,7 @@ func (op *Reporting) queueDependentReportDataSourcesForDataSource(dataSource *me
return nil
}

func (op *Reporting) queueDependentReportsForDataSource(dataSource *metering.ReportDataSource) error {
func (op *defaultReportingOperator) queueDependentReportsForDataSource(dataSource *metering.ReportDataSource) error {
// Look at reports in the namespace of this dataSource
reports, err := op.reportLister.Reports(dataSource.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -752,7 +752,7 @@ func (op *Reporting) queueDependentReportsForDataSource(dataSource *metering.Rep
return nil
}

func updateReportDataSource(dsClient cbInterfaces.ReportDataSourceInterface, dsName string, updateFunc func(*metering.ReportDataSource)) (*metering.ReportDataSource, error) {
func updateReportDataSource(dsClient clientset.ReportDataSourceInterface, dsName string, updateFunc func(*metering.ReportDataSource)) (*metering.ReportDataSource, error) {
var ds *metering.ReportDataSource
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newDS, err := dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type statusResponse struct {
// healthinessHandler is the readiness check for the metering operator. If this
// no requests will be sent to this pod, and rolling updates will not proceed
// until the checks succeed.
func (op *Reporting) readinessHandler(w http.ResponseWriter, r *http.Request) {
func (op *defaultReportingOperator) readinessHandler(w http.ResponseWriter, r *http.Request) {
logger := newRequestLogger(op.logger, r, op.rand)
if !op.isInitialized() {
logger.Debugf("not ready: operator is not yet initialized")
Expand Down
16 changes: 8 additions & 8 deletions pkg/operator/hivetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func init() {
prometheus.MustRegister(hiveTablePartitionsGauge)
}

func (op *Reporting) runHiveTableWorker() {
func (op *defaultReportingOperator) runHiveTableWorker() {
logger := op.logger.WithField("component", "hiveTableWorker")
logger.Infof("HiveTable worker started")
const maxRequeues = 10
for op.processResource(logger, op.syncHiveTable, "HiveTable", op.hiveTableQueue, maxRequeues) {
}
}

func (op *Reporting) syncHiveTable(logger log.FieldLogger, key string) error {
func (op *defaultReportingOperator) syncHiveTable(logger log.FieldLogger, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
logger.WithError(err).Errorf("invalid resource key :%s", key)
Expand Down Expand Up @@ -90,7 +90,7 @@ func (op *Reporting) syncHiveTable(logger log.FieldLogger, key string) error {
return nil
}

func (op *Reporting) handleHiveTable(logger log.FieldLogger, hiveTable *metering.HiveTable) error {
func (op *defaultReportingOperator) handleHiveTable(logger log.FieldLogger, hiveTable *metering.HiveTable) error {
if op.cfg.EnableFinalizers && hiveTableNeedsFinalizer(hiveTable) {
var err error
hiveTable, err = op.addHiveTableFinalizer(hiveTable)
Expand Down Expand Up @@ -275,7 +275,7 @@ func (op *Reporting) handleHiveTable(logger log.FieldLogger, hiveTable *metering
return nil
}

func (op *Reporting) addHiveTableFinalizer(hiveTable *metering.HiveTable) (*metering.HiveTable, error) {
func (op *defaultReportingOperator) addHiveTableFinalizer(hiveTable *metering.HiveTable) (*metering.HiveTable, error) {
hiveTable.Finalizers = append(hiveTable.Finalizers, hiveTableFinalizer)
newHiveTable, err := op.meteringClient.MeteringV1().HiveTables(hiveTable.Namespace).Update(context.TODO(), hiveTable, metav1.UpdateOptions{})
logger := op.logger.WithFields(log.Fields{"hiveTable": hiveTable.Name, "namespace": hiveTable.Namespace})
Expand All @@ -287,7 +287,7 @@ func (op *Reporting) addHiveTableFinalizer(hiveTable *metering.HiveTable) (*mete
return newHiveTable, nil
}

func (op *Reporting) removeHiveTableFinalizer(hiveTable *metering.HiveTable) (*metering.HiveTable, error) {
func (op *defaultReportingOperator) removeHiveTableFinalizer(hiveTable *metering.HiveTable) (*metering.HiveTable, error) {
if !slice.ContainsString(hiveTable.ObjectMeta.Finalizers, hiveTableFinalizer, nil) {
return hiveTable, nil
}
Expand All @@ -306,7 +306,7 @@ func hiveTableNeedsFinalizer(hiveTable *metering.HiveTable) bool {
return hiveTable.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(hiveTable.ObjectMeta.Finalizers, hiveTableFinalizer, nil)
}

func (op *Reporting) createHiveTableCR(obj metav1.Object, gvk schema.GroupVersionKind, params hive.TableParameters, managePartitions bool, partitions []hive.TablePartition) (*metering.HiveTable, error) {
func (op *defaultReportingOperator) createHiveTableCR(obj metav1.Object, gvk schema.GroupVersionKind, params hive.TableParameters, managePartitions bool, partitions []hive.TablePartition) (*metering.HiveTable, error) {
apiVersion := gvk.GroupVersion().String()
kind := gvk.Kind
name := obj.GetName()
Expand Down Expand Up @@ -378,7 +378,7 @@ func (op *Reporting) createHiveTableCR(obj metav1.Object, gvk schema.GroupVersio
return hiveTable, nil
}

func (op *Reporting) waitForHiveTable(namespace, name string, pollInterval, timeout time.Duration) (*metering.HiveTable, error) {
func (op *defaultReportingOperator) waitForHiveTable(namespace, name string, pollInterval, timeout time.Duration) (*metering.HiveTable, error) {
var hiveTable *metering.HiveTable
err := wait.Poll(pollInterval, timeout, func() (bool, error) {
var err error
Expand All @@ -403,7 +403,7 @@ func (op *Reporting) waitForHiveTable(namespace, name string, pollInterval, time
return hiveTable, nil
}

func (op *Reporting) dropHiveTable(hiveTable *metering.HiveTable) error {
func (op *defaultReportingOperator) dropHiveTable(hiveTable *metering.HiveTable) error {
tableName := hiveTable.Status.TableName
databaseName := hiveTable.Status.DatabaseName
if tableName == "" {
Expand Down
Loading

0 comments on commit 9b659da

Please sign in to comment.