Skip to content

chore(grafana controller): Merge startup sync #2013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1beta1/namespaced_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ type NamespacedResource string

type NamespacedResourceList []NamespacedResource

// +kubebuilder:object:generate=false
type NamespacedResourceImpl[T interface{}] interface {
Find(namespace string, name string) *T
}

func (in NamespacedResource) Split() (string, string, string) {
parts := strings.Split(string(in), "/")
return parts[0], parts[1], parts[2]
Expand Down
82 changes: 2 additions & 80 deletions controllers/contactpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

kuberr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/grafana/grafana-openapi-client-go/models"
grafanav1beta1 "github.com/grafana/grafana-operator/v5/api/v1beta1"
client2 "github.com/grafana/grafana-operator/v5/controllers/client"
"github.com/grafana/grafana-operator/v5/controllers/metrics"
)

const (
Expand All @@ -54,55 +52,6 @@ type GrafanaContactPointReconciler struct {
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanacontactpoints/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanacontactpoints/finalizers,verbs=update

func (r *GrafanaContactPointReconciler) syncStatuses(ctx context.Context) error {
log := logf.FromContext(ctx)

// get all grafana instances
grafanas := &grafanav1beta1.GrafanaList{}
var opts []client.ListOption
err := r.List(ctx, grafanas, opts...)
if err != nil {
return err
}
// no instances, no need to sync
if len(grafanas.Items) == 0 {
return nil
}

// get all contact points
allContactPoints := &grafanav1beta1.GrafanaContactPointList{}
err = r.List(ctx, allContactPoints, opts...)
if err != nil {
return err
}

// delete contact points from grafana statuses that do no longer have a cr
contactpointsSynced := 0
for _, grafana := range grafanas.Items {
statusUpdated := false
for _, contactpoint := range grafana.Status.ContactPoints {
namespace, name, _ := contactpoint.Split()
if allContactPoints.Find(namespace, name) == nil {
grafana.Status.ContactPoints = grafana.Status.ContactPoints.Remove(namespace, name)
contactpointsSynced += 1
statusUpdated = true
}
}

if statusUpdated {
err = r.Client.Status().Update(ctx, &grafana)
if err != nil {
return err
}
}
}

if contactpointsSynced > 0 {
log.Info("successfully synced contact points", "contactpoints", contactpointsSynced)
}
return nil
}

func (r *GrafanaContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx).WithName("GrafanaContactPointReconciler")
ctx = logf.IntoContext(ctx, log)
Expand Down Expand Up @@ -290,36 +239,9 @@ func (r *GrafanaContactPointReconciler) finalize(ctx context.Context, contactPoi
}

// SetupWithManager sets up the controller with the Manager.
func (r *GrafanaContactPointReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
func (r *GrafanaContactPointReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&grafanav1beta1.GrafanaContactPoint{}).
WithEventFilter(ignoreStatusUpdates()).
Complete(r)
if err != nil {
return err
}

go func() {
log := logf.FromContext(ctx).WithName("GrafanaContactPointReconciler")
for {
select {
case <-ctx.Done():
return
case <-time.After(initialSyncDelay):
start := time.Now()
err := r.syncStatuses(ctx)
elapsed := time.Since(start).Milliseconds()
metrics.InitialContactPointSyncDuration.Set(float64(elapsed))
if err != nil {
log.Error(err, "error synchronizing contact points")
continue
}

log.Info("contact point sync complete")
return
}
}
}()

return nil
}
80 changes: 1 addition & 79 deletions controllers/dashboard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"reflect"
"strings"
"time"

"k8s.io/utils/strings/slices"

Expand All @@ -35,7 +34,6 @@ import (
"github.com/grafana/grafana-operator/v5/api/v1beta1"
client2 "github.com/grafana/grafana-operator/v5/controllers/client"
"github.com/grafana/grafana-operator/v5/controllers/content"
"github.com/grafana/grafana-operator/v5/controllers/metrics"
corev1 "k8s.io/api/core/v1"
kuberr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -65,55 +63,6 @@ type GrafanaDashboardReconciler struct {
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanadashboards/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanadashboards/finalizers,verbs=update

func (r *GrafanaDashboardReconciler) syncStatuses(ctx context.Context) error {
log := logf.FromContext(ctx)

// get all grafana instances
grafanas := &v1beta1.GrafanaList{}
var opts []client.ListOption
err := r.List(ctx, grafanas, opts...)
if err != nil {
return err
}
// no instances, no need to sync
if len(grafanas.Items) == 0 {
return nil
}

// get all dashboards
allDashboards := &v1beta1.GrafanaDashboardList{}
err = r.List(ctx, allDashboards, opts...)
if err != nil {
return err
}

// delete dashboards from grafana statuses that do no longer have a cr
dashboardsSynced := 0
for _, grafana := range grafanas.Items {
statusUpdated := false
for _, dashboard := range grafana.Status.Dashboards {
namespace, name, _ := dashboard.Split()
if allDashboards.Find(namespace, name) == nil {
grafana.Status.Dashboards = grafana.Status.Dashboards.Remove(namespace, name)
dashboardsSynced += 1
statusUpdated = true
}
}

if statusUpdated {
err = r.Client.Status().Update(ctx, &grafana)
if err != nil {
return err
}
}
}

if dashboardsSynced > 0 {
log.Info("successfully synced dashboards", "dashboards", dashboardsSynced)
}
return nil
}

func (r *GrafanaDashboardReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { //nolint:gocyclo
log := logf.FromContext(ctx).WithName("GrafanaDashboardReconciler")
ctx = logf.IntoContext(ctx, log)
Expand Down Expand Up @@ -576,7 +525,7 @@ func (r *GrafanaDashboardReconciler) SetupWithManager(ctx context.Context, mgr c
return fmt.Errorf("failed setting index fields: %w", err)
}

err := ctrl.NewControllerManagedBy(mgr).
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.GrafanaDashboard{}, builder.WithPredicates(
ignoreStatusUpdates(),
)).
Expand All @@ -585,33 +534,6 @@ func (r *GrafanaDashboardReconciler) SetupWithManager(ctx context.Context, mgr c
handler.EnqueueRequestsFromMapFunc(r.requestsForChangeByField(configMapIndexKey)),
).
Complete(r)
if err != nil {
return err
}

go func() {
log := logf.FromContext(ctx).WithName("GrafanaDashboardReconciler")
for {
select {
case <-ctx.Done():
return
case <-time.After(initialSyncDelay):
start := time.Now()
err := r.syncStatuses(ctx)
elapsed := time.Since(start).Milliseconds()
metrics.InitialDashboardSyncDuration.Set(float64(elapsed))
if err != nil {
log.Error(err, "error synchronizing dashboards")
continue
}

log.Info("dashboard sync complete")
return
}
}
}()

return nil
}

func (r *GrafanaDashboardReconciler) indexConfigMapSource() func(o client.Object) []string {
Expand Down
85 changes: 2 additions & 83 deletions controllers/datasource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ import (
"errors"
"fmt"
"strings"
"time"

simplejson "github.com/bitly/go-simplejson"
"github.com/grafana/grafana-openapi-client-go/client/datasources"
"github.com/grafana/grafana-openapi-client-go/models"

"github.com/grafana/grafana-operator/v5/controllers/metrics"

genapi "github.com/grafana/grafana-openapi-client-go/client"
client2 "github.com/grafana/grafana-operator/v5/controllers/client"
kuberr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -58,55 +55,6 @@ type GrafanaDatasourceReconciler struct {
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanadatasources/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=grafana.integreatly.org,resources=grafanadatasources/finalizers,verbs=update

func (r *GrafanaDatasourceReconciler) syncStatuses(ctx context.Context) error {
log := logf.FromContext(ctx)

// get all grafana instances
grafanas := &v1beta1.GrafanaList{}
var opts []client.ListOption
err := r.List(ctx, grafanas, opts...)
if err != nil {
return err
}
// no instances, no need to sync
if len(grafanas.Items) == 0 {
return nil
}

// get all datasources
allDatasource := &v1beta1.GrafanaDatasourceList{}
err = r.List(ctx, allDatasource, opts...)
if err != nil {
return err
}

// delete datasources datasourcegrafana statuses that no longer have a CR
datasourcesSynced := 0
for _, grafana := range grafanas.Items {
statusUpdated := false
for _, ds := range grafana.Status.Datasources {
namespace, name, _ := ds.Split()
if allDatasource.Find(namespace, name) == nil {
grafana.Status.Datasources = grafana.Status.Datasources.Remove(namespace, name)
datasourcesSynced += 1
statusUpdated = true
}
}

if statusUpdated {
err = r.Status().Update(ctx, &grafana)
if err != nil {
return err
}
}
}

if datasourcesSynced > 0 {
log.Info("successfully synced datasources", "datasources", datasourcesSynced)
}
return nil
}

func (r *GrafanaDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx).WithName("GrafanaDatasourceReconciler")
ctx = logf.IntoContext(ctx, log)
Expand Down Expand Up @@ -374,40 +322,11 @@ func (r *GrafanaDatasourceReconciler) Exists(client *genapi.GrafanaHTTPAPI, uid,
}

// SetupWithManager sets up the controller with the Manager.
func (r *GrafanaDatasourceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
func (r *GrafanaDatasourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.GrafanaDatasource{}).
WithEventFilter(ignoreStatusUpdates()).
Complete(r)
if err != nil {
return err
}

go func() {
// periodic sync reconcile
log := logf.FromContext(ctx).WithName("GrafanaDatasourceReconciler")

for {
select {
case <-ctx.Done():
return
case <-time.After(initialSyncDelay):
start := time.Now()
err := r.syncStatuses(ctx)
elapsed := time.Since(start).Milliseconds()
metrics.InitialDatasourceSyncDuration.Set(float64(elapsed))
if err != nil {
log.Error(err, "error synchronizing datasources")
continue
}

log.Info("datasource sync complete")
return
}
}
}()

return nil
}

func (r *GrafanaDatasourceReconciler) buildDatasourceModel(ctx context.Context, cr *v1beta1.GrafanaDatasource) (*models.UpdateDataSourceCommand, string, error) {
Expand Down
Loading