diff --git a/pkg/client/artifacts.go b/pkg/client/artifacts.go new file mode 100644 index 0000000..3a31c5c --- /dev/null +++ b/pkg/client/artifacts.go @@ -0,0 +1,150 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" + "github.com/google/uuid" +) + +// ArtifactsService handles operations related to artifacts. +type ArtifactsService struct { + client *Client +} + +// Create creates a new artifact. +func (s *ArtifactsService) Create(ctx context.Context, req *models.ArtifactCreate) (*models.Artifact, error) { + var artifact models.Artifact + if err := s.client.post(ctx, "/artifacts/", req, &artifact); err != nil { + return nil, fmt.Errorf("failed to create artifact: %w", err) + } + return &artifact, nil +} + +// Get retrieves an artifact by ID. +func (s *ArtifactsService) Get(ctx context.Context, id uuid.UUID) (*models.Artifact, error) { + var artifact models.Artifact + path := joinPath("/artifacts", id.String()) + if err := s.client.get(ctx, path, &artifact); err != nil { + return nil, fmt.Errorf("failed to get artifact: %w", err) + } + return &artifact, nil +} + +// GetLatest retrieves the latest artifact for a given key. +func (s *ArtifactsService) GetLatest(ctx context.Context, key string) (*models.Artifact, error) { + var artifact models.Artifact + path := joinPath("/artifacts", key, "latest") + if err := s.client.get(ctx, path, &artifact); err != nil { + return nil, fmt.Errorf("failed to get latest artifact: %w", err) + } + return &artifact, nil +} + +// Update updates an artifact. +func (s *ArtifactsService) Update(ctx context.Context, id uuid.UUID, req *models.ArtifactUpdate) error { + path := joinPath("/artifacts", id.String()) + if err := s.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to update artifact: %w", err) + } + return nil +} + +// Delete deletes an artifact by ID. +func (s *ArtifactsService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/artifacts", id.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete artifact: %w", err) + } + return nil +} + +// List retrieves a list of artifacts with optional filtering. +func (s *ArtifactsService) List(ctx context.Context, filter *models.ArtifactFilter, offset, limit int) (*pagination.PaginatedResponse[models.Artifact], error) { + if filter == nil { + filter = &models.ArtifactFilter{} + } + filter.Offset = offset + filter.Limit = limit + + type response struct { + Results []models.Artifact `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := s.client.post(ctx, "/artifacts/filter", filter, &resp); err != nil { + return nil, fmt.Errorf("failed to list artifacts: %w", err) + } + + return &pagination.PaginatedResponse[models.Artifact]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} + +// ListAll returns an iterator for all artifacts matching the filter. +func (s *ArtifactsService) ListAll(ctx context.Context, filter *models.ArtifactFilter) *pagination.Iterator[models.Artifact] { + fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Artifact], error) { + return s.List(ctx, filter, offset, limit) + } + return pagination.NewIterator(fetchFunc, 100) +} + +// Count returns the number of artifacts matching the filter. +func (s *ArtifactsService) Count(ctx context.Context, filter *models.ArtifactFilter) (int, error) { + if filter == nil { + filter = &models.ArtifactFilter{} + } + + var count int + if err := s.client.post(ctx, "/artifacts/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count artifacts: %w", err) + } + return count, nil +} + +// ListLatest retrieves the latest artifact collections with optional filtering. +func (s *ArtifactsService) ListLatest(ctx context.Context, filter *models.ArtifactCollectionFilter, offset, limit int) (*pagination.PaginatedResponse[models.ArtifactCollection], error) { + if filter == nil { + filter = &models.ArtifactCollectionFilter{} + } + filter.Offset = offset + filter.Limit = limit + + type response struct { + Results []models.ArtifactCollection `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := s.client.post(ctx, "/artifacts/latest/filter", filter, &resp); err != nil { + return nil, fmt.Errorf("failed to list latest artifacts: %w", err) + } + + return &pagination.PaginatedResponse[models.ArtifactCollection]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} + +// CountLatest returns the number of latest artifact collections matching the filter. +func (s *ArtifactsService) CountLatest(ctx context.Context, filter *models.ArtifactCollectionFilter) (int, error) { + if filter == nil { + filter = &models.ArtifactCollectionFilter{} + } + + var count int + if err := s.client.post(ctx, "/artifacts/latest/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count latest artifacts: %w", err) + } + return count, nil +} diff --git a/pkg/client/artifacts_test.go b/pkg/client/artifacts_test.go new file mode 100644 index 0000000..1a43b56 --- /dev/null +++ b/pkg/client/artifacts_test.go @@ -0,0 +1,139 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestArtifactsService_Create(t *testing.T) { + expected := models.Artifact{ + ID: uuid.New(), + Key: strPtr("test-key"), + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/artifacts/" { + t.Errorf("path = %v, want /api/artifacts/", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + artifact, err := client.Artifacts.Create(ctx, &models.ArtifactCreate{Key: strPtr("test-key")}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if artifact.ID != expected.ID { + t.Errorf("ID = %v, want %v", artifact.ID, expected.ID) + } +} + +func TestArtifactsService_Get(t *testing.T) { + id := uuid.New() + expected := models.Artifact{ID: id, Key: strPtr("test-key")} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + artifact, err := client.Artifacts.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if artifact.ID != id { + t.Errorf("ID = %v, want %v", artifact.ID, id) + } +} + +func TestArtifactsService_Delete(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + t.Errorf("method = %v, want DELETE", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.Artifacts.Delete(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestArtifactsService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + if r.URL.Path != "/api/artifacts/filter" { + t.Errorf("path = %v, want /api/artifacts/filter", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + resp := map[string]interface{}{ + "results": []models.Artifact{{ID: uuid.New()}}, + "count": 1, + } + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + result, err := client.Artifacts.List(ctx, nil, 0, 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(result.Results) != 1 { + t.Errorf("results count = %v, want 1", len(result.Results)) + } +} + +func TestArtifactsService_Count(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`5`)) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + count, err := client.Artifacts.Count(ctx, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if count != 5 { + t.Errorf("count = %v, want 5", count) + } +} + +func strPtr(s string) *string { + return &s +} diff --git a/pkg/client/automations.go b/pkg/client/automations.go new file mode 100644 index 0000000..fc1dffc --- /dev/null +++ b/pkg/client/automations.go @@ -0,0 +1,120 @@ +package client + +import ( + "context" + "fmt" + "net/http" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// AutomationsService handles operations related to automations. +type AutomationsService struct { + client *Client +} + +// Create creates a new automation. +func (s *AutomationsService) Create(ctx context.Context, req *models.AutomationCreate) (*models.Automation, error) { + var automation models.Automation + if err := s.client.do(ctx, http.MethodPost, "/automations/", req, &automation); err != nil { + return nil, fmt.Errorf("failed to create automation: %w", err) + } + return &automation, nil +} + +// Get retrieves an automation by ID. +func (s *AutomationsService) Get(ctx context.Context, id uuid.UUID) (*models.Automation, error) { + var automation models.Automation + path := joinPath("/automations", id.String()) + if err := s.client.get(ctx, path, &automation); err != nil { + return nil, fmt.Errorf("failed to get automation: %w", err) + } + return &automation, nil +} + +// Update fully replaces an automation. +func (s *AutomationsService) Update(ctx context.Context, id uuid.UUID, req *models.AutomationUpdate) (*models.Automation, error) { + var automation models.Automation + path := joinPath("/automations", id.String()) + if err := s.client.do(ctx, http.MethodPut, path, req, &automation); err != nil { + return nil, fmt.Errorf("failed to update automation: %w", err) + } + return &automation, nil +} + +// Patch partially updates an automation. +func (s *AutomationsService) Patch(ctx context.Context, id uuid.UUID, req *models.AutomationPartialUpdate) error { + path := joinPath("/automations", id.String()) + if err := s.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to patch automation: %w", err) + } + return nil +} + +// Delete deletes an automation by ID. +func (s *AutomationsService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/automations", id.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete automation: %w", err) + } + return nil +} + +// List retrieves automations with optional filtering. +func (s *AutomationsService) List(ctx context.Context, filter *models.AutomationFilter) ([]models.Automation, error) { + if filter == nil { + filter = &models.AutomationFilter{} + } + + var automations []models.Automation + if err := s.client.post(ctx, "/automations/filter", filter, &automations); err != nil { + return nil, fmt.Errorf("failed to list automations: %w", err) + } + return automations, nil +} + +// Count returns the number of automations matching the filter. +func (s *AutomationsService) Count(ctx context.Context, filter *models.AutomationFilter) (int, error) { + if filter == nil { + filter = &models.AutomationFilter{} + } + + var count int + if err := s.client.post(ctx, "/automations/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count automations: %w", err) + } + return count, nil +} + +// GetRelatedTo retrieves automations related to a specific resource. +func (s *AutomationsService) GetRelatedTo(ctx context.Context, resourceID string) ([]models.Automation, error) { + var automations []models.Automation + path := joinPath("/automations/related-to", resourceID) + if err := s.client.get(ctx, path, &automations); err != nil { + return nil, fmt.Errorf("failed to get related automations: %w", err) + } + return automations, nil +} + +// DeleteOwnedBy deletes automations owned by a specific resource. +func (s *AutomationsService) DeleteOwnedBy(ctx context.Context, resourceID string) error { + path := joinPath("/automations/owned-by", resourceID) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete owned automations: %w", err) + } + return nil +} + +// ValidateTemplate validates an automation template. +func (s *AutomationsService) ValidateTemplate(ctx context.Context, template string) error { + req := struct { + Template string `json:"template"` + }{ + Template: template, + } + if err := s.client.post(ctx, "/automations/templates/validate", req, nil); err != nil { + return fmt.Errorf("failed to validate template: %w", err) + } + return nil +} diff --git a/pkg/client/automations_test.go b/pkg/client/automations_test.go new file mode 100644 index 0000000..ba5370a --- /dev/null +++ b/pkg/client/automations_test.go @@ -0,0 +1,135 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestAutomationsService_Create(t *testing.T) { + expected := models.Automation{ + ID: uuid.New(), + Name: "test-automation", + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/automations/" { + t.Errorf("path = %v, want /api/automations/", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + automation, err := client.Automations.Create(ctx, &models.AutomationCreate{ + Name: "test-automation", + Trigger: json.RawMessage(`{"type":"event"}`), + Actions: []json.RawMessage{json.RawMessage(`{"type":"do-nothing"}`)}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if automation.Name != expected.Name { + t.Errorf("Name = %v, want %v", automation.Name, expected.Name) + } +} + +func TestAutomationsService_Get(t *testing.T) { + id := uuid.New() + expected := models.Automation{ID: id, Name: "test"} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + automation, err := client.Automations.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if automation.ID != id { + t.Errorf("ID = %v, want %v", automation.ID, id) + } +} + +func TestAutomationsService_Delete(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + t.Errorf("method = %v, want DELETE", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.Automations.Delete(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestAutomationsService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/automations/filter" { + t.Errorf("path = %v, want /api/automations/filter", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.Automation{{ID: uuid.New(), Name: "test"}}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + automations, err := client.Automations.List(ctx, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(automations) != 1 { + t.Errorf("count = %v, want 1", len(automations)) + } +} + +func TestAutomationsService_Patch(t *testing.T) { + id := uuid.New() + enabled := true + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPatch { + t.Errorf("method = %v, want PATCH", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.Automations.Patch(ctx, id, &models.AutomationPartialUpdate{Enabled: &enabled}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/pkg/client/concurrency_limits.go b/pkg/client/concurrency_limits.go new file mode 100644 index 0000000..3fe04bc --- /dev/null +++ b/pkg/client/concurrency_limits.go @@ -0,0 +1,114 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// ConcurrencyLimitsService handles operations related to v1 concurrency limits. +type ConcurrencyLimitsService struct { + client *Client +} + +// Create creates a new concurrency limit. +func (s *ConcurrencyLimitsService) Create(ctx context.Context, req *models.ConcurrencyLimitCreate) (*models.ConcurrencyLimit, error) { + var limit models.ConcurrencyLimit + if err := s.client.post(ctx, "/concurrency_limits/", req, &limit); err != nil { + return nil, fmt.Errorf("failed to create concurrency limit: %w", err) + } + return &limit, nil +} + +// Get retrieves a concurrency limit by ID. +func (s *ConcurrencyLimitsService) Get(ctx context.Context, id uuid.UUID) (*models.ConcurrencyLimit, error) { + var limit models.ConcurrencyLimit + path := joinPath("/concurrency_limits", id.String()) + if err := s.client.get(ctx, path, &limit); err != nil { + return nil, fmt.Errorf("failed to get concurrency limit: %w", err) + } + return &limit, nil +} + +// GetByTag retrieves a concurrency limit by tag. +func (s *ConcurrencyLimitsService) GetByTag(ctx context.Context, tag string) (*models.ConcurrencyLimit, error) { + var limit models.ConcurrencyLimit + path := joinPath("/concurrency_limits/tag", tag) + if err := s.client.get(ctx, path, &limit); err != nil { + return nil, fmt.Errorf("failed to get concurrency limit by tag: %w", err) + } + return &limit, nil +} + +// Delete deletes a concurrency limit by ID. +func (s *ConcurrencyLimitsService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/concurrency_limits", id.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete concurrency limit: %w", err) + } + return nil +} + +// DeleteByTag deletes a concurrency limit by tag. +func (s *ConcurrencyLimitsService) DeleteByTag(ctx context.Context, tag string) error { + path := joinPath("/concurrency_limits/tag", tag) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete concurrency limit by tag: %w", err) + } + return nil +} + +// ResetByTag resets a concurrency limit by tag. +func (s *ConcurrencyLimitsService) ResetByTag(ctx context.Context, tag string) error { + path := joinPath("/concurrency_limits/tag", tag, "reset") + if err := s.client.post(ctx, path, nil, nil); err != nil { + return fmt.Errorf("failed to reset concurrency limit: %w", err) + } + return nil +} + +// List retrieves concurrency limits with optional filtering. +func (s *ConcurrencyLimitsService) List(ctx context.Context) ([]models.ConcurrencyLimit, error) { + var limits []models.ConcurrencyLimit + if err := s.client.post(ctx, "/concurrency_limits/filter", nil, &limits); err != nil { + return nil, fmt.Errorf("failed to list concurrency limits: %w", err) + } + return limits, nil +} + +// Increment increments concurrency limit slots. +func (s *ConcurrencyLimitsService) Increment(ctx context.Context, names []string, slots int, mode string) ([]models.MinimalConcurrencyLimitResponse, error) { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + Mode string `json:"mode,omitempty"` + }{ + Names: names, + Slots: slots, + Mode: mode, + } + + var resp []models.MinimalConcurrencyLimitResponse + if err := s.client.post(ctx, "/concurrency_limits/increment", req, &resp); err != nil { + return nil, fmt.Errorf("failed to increment concurrency limits: %w", err) + } + return resp, nil +} + +// Decrement decrements concurrency limit slots. +func (s *ConcurrencyLimitsService) Decrement(ctx context.Context, names []string, slots int) error { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + }{ + Names: names, + Slots: slots, + } + + if err := s.client.post(ctx, "/concurrency_limits/decrement", req, nil); err != nil { + return fmt.Errorf("failed to decrement concurrency limits: %w", err) + } + return nil +} diff --git a/pkg/client/concurrency_limits_test.go b/pkg/client/concurrency_limits_test.go new file mode 100644 index 0000000..82b3340 --- /dev/null +++ b/pkg/client/concurrency_limits_test.go @@ -0,0 +1,126 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestConcurrencyLimitsService_Create(t *testing.T) { + expected := models.ConcurrencyLimit{ + ID: uuid.New(), + Tag: "test-tag", + ConcurrencyLimit: 10, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limit, err := client.ConcurrencyLimits.Create(ctx, &models.ConcurrencyLimitCreate{ + Tag: "test-tag", + ConcurrencyLimit: 10, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if limit.Tag != "test-tag" { + t.Errorf("Tag = %v, want test-tag", limit.Tag) + } +} + +func TestConcurrencyLimitsService_Get(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.ConcurrencyLimit{ID: id, Tag: "test"}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limit, err := client.ConcurrencyLimits.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if limit.ID != id { + t.Errorf("ID = %v, want %v", limit.ID, id) + } +} + +func TestConcurrencyLimitsService_GetByTag(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.ConcurrencyLimit{ID: uuid.New(), Tag: "my-tag"}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limit, err := client.ConcurrencyLimits.GetByTag(ctx, "my-tag") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if limit.Tag != "my-tag" { + t.Errorf("Tag = %v, want my-tag", limit.Tag) + } +} + +func TestConcurrencyLimitsService_Delete(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + t.Errorf("method = %v, want DELETE", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.ConcurrencyLimits.Delete(ctx, uuid.New()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestConcurrencyLimitsService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.ConcurrencyLimit{{ID: uuid.New(), Tag: "test"}}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limits, err := client.ConcurrencyLimits.List(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(limits) != 1 { + t.Errorf("count = %v, want 1", len(limits)) + } +} diff --git a/pkg/client/concurrency_limits_v2.go b/pkg/client/concurrency_limits_v2.go new file mode 100644 index 0000000..9b8572a --- /dev/null +++ b/pkg/client/concurrency_limits_v2.go @@ -0,0 +1,141 @@ +package client + +import ( + "context" + "fmt" + "net/http" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// ConcurrencyLimitsV2Service handles operations related to v2 concurrency limits. +type ConcurrencyLimitsV2Service struct { + client *Client +} + +// Create creates a new v2 concurrency limit. +func (s *ConcurrencyLimitsV2Service) Create(ctx context.Context, req *models.ConcurrencyLimitV2Create) (*models.ConcurrencyLimitV2, error) { + var limit models.ConcurrencyLimitV2 + if err := s.client.do(ctx, http.MethodPost, "/v2/concurrency_limits/", req, &limit); err != nil { + return nil, fmt.Errorf("failed to create v2 concurrency limit: %w", err) + } + return &limit, nil +} + +// Get retrieves a v2 concurrency limit by ID or name. +func (s *ConcurrencyLimitsV2Service) Get(ctx context.Context, idOrName string) (*models.ConcurrencyLimitV2, error) { + var limit models.ConcurrencyLimitV2 + path := joinPath("/v2/concurrency_limits", idOrName) + if err := s.client.get(ctx, path, &limit); err != nil { + return nil, fmt.Errorf("failed to get v2 concurrency limit: %w", err) + } + return &limit, nil +} + +// Update updates a v2 concurrency limit. +func (s *ConcurrencyLimitsV2Service) Update(ctx context.Context, idOrName string, req *models.ConcurrencyLimitV2Update) error { + path := joinPath("/v2/concurrency_limits", idOrName) + if err := s.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to update v2 concurrency limit: %w", err) + } + return nil +} + +// Delete deletes a v2 concurrency limit by ID or name. +func (s *ConcurrencyLimitsV2Service) Delete(ctx context.Context, idOrName string) error { + path := joinPath("/v2/concurrency_limits", idOrName) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete v2 concurrency limit: %w", err) + } + return nil +} + +// List retrieves all v2 concurrency limits. +func (s *ConcurrencyLimitsV2Service) List(ctx context.Context) ([]models.GlobalConcurrencyLimitResponse, error) { + var limits []models.GlobalConcurrencyLimitResponse + if err := s.client.post(ctx, "/v2/concurrency_limits/filter", nil, &limits); err != nil { + return nil, fmt.Errorf("failed to list v2 concurrency limits: %w", err) + } + return limits, nil +} + +// Increment increments active slots for v2 concurrency limits. +func (s *ConcurrencyLimitsV2Service) Increment(ctx context.Context, names []string, slots int, mode string) error { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + Mode string `json:"mode,omitempty"` + }{ + Names: names, + Slots: slots, + Mode: mode, + } + + if err := s.client.post(ctx, "/v2/concurrency_limits/increment", req, nil); err != nil { + return fmt.Errorf("failed to increment v2 concurrency limits: %w", err) + } + return nil +} + +// IncrementWithLease increments active slots and returns a lease. +func (s *ConcurrencyLimitsV2Service) IncrementWithLease(ctx context.Context, names []string, slots int, mode string) (*models.ConcurrencyLimitWithLeaseResponse, error) { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + Mode string `json:"mode,omitempty"` + }{ + Names: names, + Slots: slots, + Mode: mode, + } + + var resp models.ConcurrencyLimitWithLeaseResponse + if err := s.client.post(ctx, "/v2/concurrency_limits/increment-with-lease", req, &resp); err != nil { + return nil, fmt.Errorf("failed to increment v2 concurrency limits with lease: %w", err) + } + return &resp, nil +} + +// Decrement decrements active slots for v2 concurrency limits. +func (s *ConcurrencyLimitsV2Service) Decrement(ctx context.Context, names []string, slots int) error { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + }{ + Names: names, + Slots: slots, + } + + if err := s.client.post(ctx, "/v2/concurrency_limits/decrement", req, nil); err != nil { + return fmt.Errorf("failed to decrement v2 concurrency limits: %w", err) + } + return nil +} + +// DecrementWithLease decrements active slots using a lease ID. +func (s *ConcurrencyLimitsV2Service) DecrementWithLease(ctx context.Context, names []string, slots int, leaseID uuid.UUID) error { + req := struct { + Names []string `json:"names"` + Slots int `json:"slots"` + LeaseID uuid.UUID `json:"lease_id"` + }{ + Names: names, + Slots: slots, + LeaseID: leaseID, + } + + if err := s.client.post(ctx, "/v2/concurrency_limits/decrement-with-lease", req, nil); err != nil { + return fmt.Errorf("failed to decrement v2 concurrency limits with lease: %w", err) + } + return nil +} + +// RenewLease renews a concurrency lease. +func (s *ConcurrencyLimitsV2Service) RenewLease(ctx context.Context, leaseID uuid.UUID) error { + path := joinPath("/v2/concurrency_limits/leases", leaseID.String(), "renew") + if err := s.client.post(ctx, path, nil, nil); err != nil { + return fmt.Errorf("failed to renew concurrency lease: %w", err) + } + return nil +} diff --git a/pkg/client/concurrency_limits_v2_test.go b/pkg/client/concurrency_limits_v2_test.go new file mode 100644 index 0000000..2a00262 --- /dev/null +++ b/pkg/client/concurrency_limits_v2_test.go @@ -0,0 +1,128 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestConcurrencyLimitsV2Service_Create(t *testing.T) { + expected := models.ConcurrencyLimitV2{ + ID: uuid.New(), + Name: "test-limit", + Limit: 5, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v2/concurrency_limits/" { + t.Errorf("path = %v, want /api/v2/concurrency_limits/", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limit, err := client.ConcurrencyLimitsV2.Create(ctx, &models.ConcurrencyLimitV2Create{ + Name: "test-limit", + Limit: 5, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if limit.Name != "test-limit" { + t.Errorf("Name = %v, want test-limit", limit.Name) + } +} + +func TestConcurrencyLimitsV2Service_Get(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.ConcurrencyLimitV2{ID: uuid.New(), Name: "test"}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limit, err := client.ConcurrencyLimitsV2.Get(ctx, "test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if limit.Name != "test" { + t.Errorf("Name = %v, want test", limit.Name) + } +} + +func TestConcurrencyLimitsV2Service_Delete(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + t.Errorf("method = %v, want DELETE", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.ConcurrencyLimitsV2.Delete(ctx, "test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestConcurrencyLimitsV2Service_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.GlobalConcurrencyLimitResponse{ + {ID: uuid.New(), Name: "test", Limit: 5, ActiveSlots: 0}, + }) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + limits, err := client.ConcurrencyLimitsV2.List(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(limits) != 1 { + t.Errorf("count = %v, want 1", len(limits)) + } +} + +func TestConcurrencyLimitsV2Service_RenewLease(t *testing.T) { + leaseID := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.ConcurrencyLimitsV2.RenewLease(ctx, leaseID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/pkg/client/events.go b/pkg/client/events.go new file mode 100644 index 0000000..7d049d6 --- /dev/null +++ b/pkg/client/events.go @@ -0,0 +1,57 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" +) + +// EventsService handles operations related to events. +type EventsService struct { + client *Client +} + +// Create creates new events. +func (s *EventsService) Create(ctx context.Context, events []models.Event) error { + if err := s.client.post(ctx, "/events", events, nil); err != nil { + return fmt.Errorf("failed to create events: %w", err) + } + return nil +} + +// List retrieves events with filtering. +func (s *EventsService) List(ctx context.Context, filter *models.EventFilter) (*models.EventPage, error) { + if filter == nil { + filter = &models.EventFilter{} + } + + var page models.EventPage + if err := s.client.post(ctx, "/events/filter", filter, &page); err != nil { + return nil, fmt.Errorf("failed to list events: %w", err) + } + return &page, nil +} + +// NextPage retrieves the next page of events using a page token. +func (s *EventsService) NextPage(ctx context.Context) (*models.EventPage, error) { + var page models.EventPage + if err := s.client.get(ctx, "/events/filter/next", &page); err != nil { + return nil, fmt.Errorf("failed to get next events page: %w", err) + } + return &page, nil +} + +// CountBy counts events grouped by a countable field. +func (s *EventsService) CountBy(ctx context.Context, countable string, filter *models.EventFilter) ([]models.EventCount, error) { + if filter == nil { + filter = &models.EventFilter{} + } + + path := joinPath("/events/count-by", countable) + var counts []models.EventCount + if err := s.client.post(ctx, path, filter, &counts); err != nil { + return nil, fmt.Errorf("failed to count events: %w", err) + } + return counts, nil +} diff --git a/pkg/client/events_test.go b/pkg/client/events_test.go new file mode 100644 index 0000000..3cdbb3e --- /dev/null +++ b/pkg/client/events_test.go @@ -0,0 +1,62 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestEventsService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/events/filter" { + t.Errorf("path = %v, want /api/events/filter", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.EventPage{ + Events: []models.Event{{ID: uuid.New(), Event: "test.event"}}, + Total: 1, + }) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + page, err := client.Events.List(ctx, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if page.Total != 1 { + t.Errorf("total = %v, want 1", page.Total) + } +} + +func TestEventsService_CountBy(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.EventCount{{Value: "test", Count: 5}}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + counts, err := client.Events.CountBy(ctx, "event", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(counts) != 1 { + t.Errorf("count = %v, want 1", len(counts)) + } +} diff --git a/pkg/client/flow_run_states.go b/pkg/client/flow_run_states.go new file mode 100644 index 0000000..0f43bd4 --- /dev/null +++ b/pkg/client/flow_run_states.go @@ -0,0 +1,33 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// FlowRunStatesService handles operations related to flow run states. +type FlowRunStatesService struct { + client *Client +} + +// List retrieves all flow run states. +func (s *FlowRunStatesService) List(ctx context.Context) ([]models.State, error) { + var states []models.State + if err := s.client.get(ctx, "/flow_run_states/", &states); err != nil { + return nil, fmt.Errorf("failed to list flow run states: %w", err) + } + return states, nil +} + +// Get retrieves a flow run state by ID. +func (s *FlowRunStatesService) Get(ctx context.Context, id uuid.UUID) (*models.State, error) { + var state models.State + path := joinPath("/flow_run_states", id.String()) + if err := s.client.get(ctx, path, &state); err != nil { + return nil, fmt.Errorf("failed to get flow run state: %w", err) + } + return &state, nil +} diff --git a/pkg/client/flow_run_states_test.go b/pkg/client/flow_run_states_test.go new file mode 100644 index 0000000..4768d02 --- /dev/null +++ b/pkg/client/flow_run_states_test.go @@ -0,0 +1,63 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestFlowRunStatesService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/flow_run_states/" { + t.Errorf("path = %v, want /api/flow_run_states/", r.URL.Path) + } + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.State{ + {ID: uuid.New(), Type: models.StateTypeCompleted}, + }) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + states, err := client.FlowRunStates.List(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(states) != 1 { + t.Errorf("count = %v, want 1", len(states)) + } +} + +func TestFlowRunStatesService_Get(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.State{ID: id, Type: models.StateTypeRunning}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + state, err := client.FlowRunStates.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if state.ID != id { + t.Errorf("ID = %v, want %v", state.ID, id) + } +} diff --git a/pkg/client/saved_searches.go b/pkg/client/saved_searches.go new file mode 100644 index 0000000..b2243c2 --- /dev/null +++ b/pkg/client/saved_searches.go @@ -0,0 +1,52 @@ +package client + +import ( + "context" + "fmt" + "net/http" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// SavedSearchesService handles operations related to saved searches. +type SavedSearchesService struct { + client *Client +} + +// Create creates a new saved search. +func (s *SavedSearchesService) Create(ctx context.Context, req *models.SavedSearchCreate) (*models.SavedSearch, error) { + var savedSearch models.SavedSearch + if err := s.client.do(ctx, http.MethodPut, "/saved_searches/", req, &savedSearch); err != nil { + return nil, fmt.Errorf("failed to create saved search: %w", err) + } + return &savedSearch, nil +} + +// Get retrieves a saved search by ID. +func (s *SavedSearchesService) Get(ctx context.Context, id uuid.UUID) (*models.SavedSearch, error) { + var savedSearch models.SavedSearch + path := joinPath("/saved_searches", id.String()) + if err := s.client.get(ctx, path, &savedSearch); err != nil { + return nil, fmt.Errorf("failed to get saved search: %w", err) + } + return &savedSearch, nil +} + +// Delete deletes a saved search by ID. +func (s *SavedSearchesService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/saved_searches", id.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete saved search: %w", err) + } + return nil +} + +// List retrieves all saved searches. +func (s *SavedSearchesService) List(ctx context.Context) ([]models.SavedSearch, error) { + var savedSearches []models.SavedSearch + if err := s.client.post(ctx, "/saved_searches/filter", nil, &savedSearches); err != nil { + return nil, fmt.Errorf("failed to list saved searches: %w", err) + } + return savedSearches, nil +} diff --git a/pkg/client/saved_searches_test.go b/pkg/client/saved_searches_test.go new file mode 100644 index 0000000..3493a27 --- /dev/null +++ b/pkg/client/saved_searches_test.go @@ -0,0 +1,103 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestSavedSearchesService_Create(t *testing.T) { + expected := models.SavedSearch{ID: uuid.New(), Name: "test-search"} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPut { + t.Errorf("method = %v, want PUT", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + search, err := client.SavedSearches.Create(ctx, &models.SavedSearchCreate{Name: "test-search"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if search.Name != expected.Name { + t.Errorf("Name = %v, want %v", search.Name, expected.Name) + } +} + +func TestSavedSearchesService_Get(t *testing.T) { + id := uuid.New() + expected := models.SavedSearch{ID: id, Name: "test"} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + search, err := client.SavedSearches.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if search.ID != id { + t.Errorf("ID = %v, want %v", search.ID, id) + } +} + +func TestSavedSearchesService_Delete(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + t.Errorf("method = %v, want DELETE", r.Method) + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + err := client.SavedSearches.Delete(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestSavedSearchesService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/saved_searches/filter" { + t.Errorf("path = %v, want /api/saved_searches/filter", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.SavedSearch{{ID: uuid.New(), Name: "test"}}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + searches, err := client.SavedSearches.List(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(searches) != 1 { + t.Errorf("count = %v, want 1", len(searches)) + } +} diff --git a/pkg/client/task_run_states.go b/pkg/client/task_run_states.go new file mode 100644 index 0000000..a894eee --- /dev/null +++ b/pkg/client/task_run_states.go @@ -0,0 +1,33 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// TaskRunStatesService handles operations related to task run states. +type TaskRunStatesService struct { + client *Client +} + +// List retrieves all task run states. +func (s *TaskRunStatesService) List(ctx context.Context) ([]models.State, error) { + var states []models.State + if err := s.client.get(ctx, "/task_run_states/", &states); err != nil { + return nil, fmt.Errorf("failed to list task run states: %w", err) + } + return states, nil +} + +// Get retrieves a task run state by ID. +func (s *TaskRunStatesService) Get(ctx context.Context, id uuid.UUID) (*models.State, error) { + var state models.State + path := joinPath("/task_run_states", id.String()) + if err := s.client.get(ctx, path, &state); err != nil { + return nil, fmt.Errorf("failed to get task run state: %w", err) + } + return &state, nil +} diff --git a/pkg/client/task_run_states_test.go b/pkg/client/task_run_states_test.go new file mode 100644 index 0000000..ce88b9a --- /dev/null +++ b/pkg/client/task_run_states_test.go @@ -0,0 +1,63 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +func TestTaskRunStatesService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/task_run_states/" { + t.Errorf("path = %v, want /api/task_run_states/", r.URL.Path) + } + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.State{ + {ID: uuid.New(), Type: models.StateTypeFailed}, + }) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + states, err := client.TaskRunStates.List(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(states) != 1 { + t.Errorf("count = %v, want 1", len(states)) + } +} + +func TestTaskRunStatesService_Get(t *testing.T) { + id := uuid.New() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("method = %v, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(models.State{ID: id, Type: models.StateTypePending}) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + state, err := client.TaskRunStates.Get(ctx, id) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if state.ID != id { + t.Errorf("ID = %v, want %v", state.ID, id) + } +} diff --git a/pkg/client/task_workers.go b/pkg/client/task_workers.go new file mode 100644 index 0000000..3e882b1 --- /dev/null +++ b/pkg/client/task_workers.go @@ -0,0 +1,26 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" +) + +// TaskWorkersService handles operations related to task workers. +type TaskWorkersService struct { + client *Client +} + +// List retrieves task workers matching the filter. +func (s *TaskWorkersService) List(ctx context.Context, filter *models.TaskWorkerFilter) ([]models.TaskWorkerResponse, error) { + if filter == nil { + filter = &models.TaskWorkerFilter{} + } + + var workers []models.TaskWorkerResponse + if err := s.client.post(ctx, "/task_workers/filter", filter, &workers); err != nil { + return nil, fmt.Errorf("failed to list task workers: %w", err) + } + return workers, nil +} diff --git a/pkg/client/task_workers_test.go b/pkg/client/task_workers_test.go new file mode 100644 index 0000000..cc9c3b4 --- /dev/null +++ b/pkg/client/task_workers_test.go @@ -0,0 +1,41 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" +) + +func TestTaskWorkersService_List(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/task_workers/filter" { + t.Errorf("path = %v, want /api/task_workers/filter", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Errorf("method = %v, want POST", r.Method) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]models.TaskWorkerResponse{ + {Identifier: "worker-1", TaskKeys: []string{"key1"}}, + }) + })) + defer server.Close() + + client, _ := NewClient(WithBaseURL(server.URL + "/api")) + ctx := context.Background() + + workers, err := client.TaskWorkers.List(ctx, &models.TaskWorkerFilter{TaskKeys: []string{"key1"}}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(workers) != 1 { + t.Errorf("count = %v, want 1", len(workers)) + } + if workers[0].Identifier != "worker-1" { + t.Errorf("Identifier = %v, want worker-1", workers[0].Identifier) + } +} diff --git a/pkg/models/artifacts.go b/pkg/models/artifacts.go new file mode 100644 index 0000000..9d73ab0 --- /dev/null +++ b/pkg/models/artifacts.go @@ -0,0 +1,127 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// ArtifactSort represents sort options for artifacts. +type ArtifactSort string + +const ( + ArtifactSortCreatedDesc ArtifactSort = "CREATED_DESC" + ArtifactSortUpdatedDesc ArtifactSort = "UPDATED_DESC" +) + +// ArtifactCollectionSort represents sort options for artifact collections. +type ArtifactCollectionSort string + +const ( + ArtifactCollectionSortCreatedDesc ArtifactCollectionSort = "CREATED_DESC" + ArtifactCollectionSortUpdatedDesc ArtifactCollectionSort = "UPDATED_DESC" +) + +// Artifact represents a Prefect artifact. +type Artifact struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Key *string `json:"key"` + Type *string `json:"type"` + Description *string `json:"description"` + Data interface{} `json:"data,omitempty"` + Metadata map[string]string `json:"metadata_,omitempty"` + FlowRunID *uuid.UUID `json:"flow_run_id"` + TaskRunID *uuid.UUID `json:"task_run_id"` +} + +// ArtifactCreate represents the request to create an artifact. +type ArtifactCreate struct { + Key *string `json:"key,omitempty"` + Type *string `json:"type,omitempty"` + Description *string `json:"description,omitempty"` + Data interface{} `json:"data,omitempty"` + Metadata map[string]string `json:"metadata_,omitempty"` + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + TaskRunID *uuid.UUID `json:"task_run_id,omitempty"` +} + +// ArtifactUpdate represents the request to update an artifact. +type ArtifactUpdate struct { + Data interface{} `json:"data,omitempty"` + Description *string `json:"description,omitempty"` + Metadata map[string]string `json:"metadata_,omitempty"` +} + +// ArtifactFilter represents filter criteria for querying artifacts. +type ArtifactFilter struct { + Key *string `json:"key,omitempty"` + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + TaskRunID *uuid.UUID `json:"task_run_id,omitempty"` + Type *string `json:"type,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// ArtifactCollection represents a collection of artifacts with the same key. +type ArtifactCollection struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Key string `json:"key"` + LatestID uuid.UUID `json:"latest_id"` + Type *string `json:"type"` + Description *string `json:"description"` + Data interface{} `json:"data,omitempty"` + Metadata map[string]string `json:"metadata_,omitempty"` + FlowRunID *uuid.UUID `json:"flow_run_id"` + TaskRunID *uuid.UUID `json:"task_run_id"` +} + +// ArtifactCollectionFilter represents filter criteria for querying artifact collections. +type ArtifactCollectionFilter struct { + Key *string `json:"key,omitempty"` + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + TaskRunID *uuid.UUID `json:"task_run_id,omitempty"` + Type *string `json:"type,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (a *Artifact) UnmarshalJSON(data []byte) error { + type Alias Artifact + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(a), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + a.Created = aux.Created.V + a.Updated = aux.Updated.V + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (ac *ArtifactCollection) UnmarshalJSON(data []byte) error { + type Alias ArtifactCollection + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(ac), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + ac.Created = aux.Created.V + ac.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/automations.go b/pkg/models/automations.go new file mode 100644 index 0000000..5959bee --- /dev/null +++ b/pkg/models/automations.go @@ -0,0 +1,247 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// AutomationSort represents sort options for automations. +type AutomationSort string + +const ( + AutomationSortCreatedDesc AutomationSort = "CREATED_DESC" + AutomationSortUpdatedDesc AutomationSort = "UPDATED_DESC" + AutomationSortNameAsc AutomationSort = "NAME_ASC" + AutomationSortNameDesc AutomationSort = "NAME_DESC" +) + +// TriggerPosture represents the posture of an event trigger. +type TriggerPosture string + +const ( + TriggerPostureReactive TriggerPosture = "Reactive" + TriggerPostureProactive TriggerPosture = "Proactive" +) + +// ActionSource represents the source type for automation actions. +type ActionSource string + +const ( + ActionSourceSelected ActionSource = "selected" + ActionSourceInferred ActionSource = "inferred" +) + +// ResourceSpecification represents resource labels to match. +type ResourceSpecification map[string]string + +// Automation represents a Prefect automation. +type Automation struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Description string `json:"description"` + Enabled bool `json:"enabled"` + Tags []string `json:"tags,omitempty"` + Trigger json.RawMessage `json:"trigger"` + Actions []json.RawMessage `json:"actions"` + ActionsOnTrigger []json.RawMessage `json:"actions_on_trigger,omitempty"` + ActionsOnResolve []json.RawMessage `json:"actions_on_resolve,omitempty"` +} + +// AutomationCreate represents the request to create an automation. +type AutomationCreate struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + Tags []string `json:"tags,omitempty"` + Trigger json.RawMessage `json:"trigger"` + Actions []json.RawMessage `json:"actions"` + ActionsOnTrigger []json.RawMessage `json:"actions_on_trigger,omitempty"` + ActionsOnResolve []json.RawMessage `json:"actions_on_resolve,omitempty"` +} + +// AutomationUpdate represents the request to fully update an automation. +type AutomationUpdate struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + Tags []string `json:"tags,omitempty"` + Trigger json.RawMessage `json:"trigger"` + Actions []json.RawMessage `json:"actions"` + ActionsOnTrigger []json.RawMessage `json:"actions_on_trigger,omitempty"` + ActionsOnResolve []json.RawMessage `json:"actions_on_resolve,omitempty"` +} + +// AutomationPartialUpdate represents the request to partially update an automation. +type AutomationPartialUpdate struct { + Enabled *bool `json:"enabled,omitempty"` +} + +// AutomationFilter represents filter criteria for querying automations. +type AutomationFilter struct { + Name *string `json:"name,omitempty"` + Tags []string `json:"tags,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// EventTrigger represents an event-based automation trigger. +type EventTrigger struct { + Type string `json:"type"` + ID uuid.UUID `json:"id"` + Match ResourceSpecification `json:"match,omitempty"` + MatchRelated json.RawMessage `json:"match_related,omitempty"` + After []string `json:"after,omitempty"` + Expect []string `json:"expect,omitempty"` + ForEach []string `json:"for_each,omitempty"` + Posture TriggerPosture `json:"posture"` + Threshold int `json:"threshold,omitempty"` + Within float64 `json:"within,omitempty"` +} + +// CompoundTrigger represents a compound automation trigger. +type CompoundTrigger struct { + Type string `json:"type"` + ID uuid.UUID `json:"id"` + Triggers json.RawMessage `json:"triggers"` + Within *float64 `json:"within"` + Require json.RawMessage `json:"require"` +} + +// SequenceTrigger represents a sequence automation trigger. +type SequenceTrigger struct { + Type string `json:"type"` + ID uuid.UUID `json:"id"` + Triggers json.RawMessage `json:"triggers"` + Within *float64 `json:"within"` +} + +// DoNothing represents a no-op automation action. +type DoNothing struct { + Type string `json:"type"` +} + +// RunDeployment represents an action to run a deployment. +type RunDeployment struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + DeploymentID *uuid.UUID `json:"deployment_id,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + JobVariables map[string]interface{} `json:"job_variables,omitempty"` +} + +// PauseDeploymentAction represents an action to pause a deployment. +type PauseDeploymentAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + DeploymentID *uuid.UUID `json:"deployment_id,omitempty"` +} + +// ResumeDeploymentAction represents an action to resume a deployment. +type ResumeDeploymentAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + DeploymentID *uuid.UUID `json:"deployment_id,omitempty"` +} + +// CancelFlowRun represents an action to cancel a flow run. +type CancelFlowRun struct { + Type string `json:"type"` +} + +// SuspendFlowRun represents an action to suspend a flow run. +type SuspendFlowRun struct { + Type string `json:"type"` +} + +// ResumeFlowRunAction represents an action to resume a flow run. +type ResumeFlowRunAction struct { + Type string `json:"type"` +} + +// ChangeFlowRunState represents an action to change a flow run's state. +type ChangeFlowRunState struct { + Type string `json:"type"` + Name *string `json:"name,omitempty"` + State StateType `json:"state"` + Message *string `json:"message,omitempty"` + Force bool `json:"force,omitempty"` +} + +// PauseWorkQueueAction represents an action to pause a work queue. +type PauseWorkQueueAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + WorkQueueID *uuid.UUID `json:"work_queue_id,omitempty"` +} + +// ResumeWorkQueueAction represents an action to resume a work queue. +type ResumeWorkQueueAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + WorkQueueID *uuid.UUID `json:"work_queue_id,omitempty"` +} + +// PauseWorkPoolAction represents an action to pause a work pool. +type PauseWorkPoolAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + WorkPoolID *uuid.UUID `json:"work_pool_id,omitempty"` +} + +// ResumeWorkPoolAction represents an action to resume a work pool. +type ResumeWorkPoolAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + WorkPoolID *uuid.UUID `json:"work_pool_id,omitempty"` +} + +// PauseAutomationAction represents an action to pause an automation. +type PauseAutomationAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + AutomationID *uuid.UUID `json:"automation_id,omitempty"` +} + +// ResumeAutomationAction represents an action to resume an automation. +type ResumeAutomationAction struct { + Type string `json:"type"` + Source ActionSource `json:"source,omitempty"` + AutomationID *uuid.UUID `json:"automation_id,omitempty"` +} + +// SendNotification represents an action to send a notification. +type SendNotification struct { + Type string `json:"type"` + BlockDocumentID uuid.UUID `json:"block_document_id"` + Subject string `json:"subject,omitempty"` + Body string `json:"body"` +} + +// CallWebhook represents an action to call a webhook. +type CallWebhook struct { + Type string `json:"type"` + BlockDocumentID uuid.UUID `json:"block_document_id"` + Payload string `json:"payload,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (a *Automation) UnmarshalJSON(data []byte) error { + type Alias Automation + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(a), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + a.Created = aux.Created.V + a.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/blocks.go b/pkg/models/blocks.go new file mode 100644 index 0000000..a0bed67 --- /dev/null +++ b/pkg/models/blocks.go @@ -0,0 +1,176 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// BlockDocumentSort represents sort options for block documents. +type BlockDocumentSort string + +const ( + BlockDocumentSortNameAsc BlockDocumentSort = "NAME_ASC" + BlockDocumentSortNameDesc BlockDocumentSort = "NAME_DESC" +) + +// BlockType represents a Prefect block type. +type BlockType struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Slug string `json:"slug"` + LogoURL *string `json:"logo_url"` + DocumentationURL *string `json:"documentation_url"` + Description *string `json:"description"` + CodeExample *string `json:"code_example"` + IsProtected bool `json:"is_protected"` +} + +// BlockTypeCreate represents the request to create a block type. +type BlockTypeCreate struct { + Name string `json:"name"` + Slug string `json:"slug"` + LogoURL *string `json:"logo_url,omitempty"` + DocumentationURL *string `json:"documentation_url,omitempty"` + Description *string `json:"description,omitempty"` + CodeExample *string `json:"code_example,omitempty"` +} + +// BlockTypeUpdate represents the request to update a block type. +type BlockTypeUpdate struct { + LogoURL *string `json:"logo_url,omitempty"` + DocumentationURL *string `json:"documentation_url,omitempty"` + Description *string `json:"description,omitempty"` + CodeExample *string `json:"code_example,omitempty"` +} + +// BlockTypeFilter represents filter criteria for querying block types. +type BlockTypeFilter struct { + Name *string // Filter by name (partial match) + Slugs []string // Filter by slugs + Capabilities []string // Filter by block capabilities +} + +// BlockSchema represents a Prefect block schema. +type BlockSchema struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Checksum string `json:"checksum"` + Fields map[string]interface{} `json:"fields"` + BlockTypeID *uuid.UUID `json:"block_type_id"` + BlockType *BlockType `json:"block_type"` + Capabilities []string `json:"capabilities"` + Version string `json:"version"` +} + +// BlockSchemaCreate represents the request to create a block schema. +type BlockSchemaCreate struct { + Fields map[string]interface{} `json:"fields,omitempty"` + BlockTypeID uuid.UUID `json:"block_type_id"` + Capabilities []string `json:"capabilities,omitempty"` + Version *string `json:"version,omitempty"` +} + +// BlockSchemaFilter represents filter criteria for querying block schemas. +type BlockSchemaFilter struct { + BlockTypeID *uuid.UUID // Filter by block type ID + Capabilities []string // Filter by required capabilities + Version *string // Filter by schema version +} + +// BlockDocument represents a Prefect block document. +type BlockDocument struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name *string `json:"name"` + Data map[string]interface{} `json:"data"` + BlockSchemaID uuid.UUID `json:"block_schema_id"` + BlockSchema *BlockSchema `json:"block_schema"` + BlockTypeID uuid.UUID `json:"block_type_id"` + BlockTypeName *string `json:"block_type_name"` + BlockType *BlockType `json:"block_type"` + BlockDocumentReferences map[string]interface{} `json:"block_document_references"` + IsAnonymous bool `json:"is_anonymous"` +} + +// BlockDocumentCreate represents the request to create a block document. +type BlockDocumentCreate struct { + Name *string `json:"name,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` + BlockSchemaID uuid.UUID `json:"block_schema_id"` + BlockTypeID uuid.UUID `json:"block_type_id"` + IsAnonymous bool `json:"is_anonymous,omitempty"` +} + +// BlockDocumentUpdate represents the request to update a block document. +type BlockDocumentUpdate struct { + BlockSchemaID *uuid.UUID `json:"block_schema_id,omitempty"` + Data map[string]interface{} `json:"data"` + MergeExisting *bool `json:"merge_existing_data,omitempty"` +} + +// BlockDocumentFilter represents filter criteria for querying block documents. +type BlockDocumentFilter struct { + BlockTypeID *uuid.UUID // Filter by block type ID + Name *string // Filter by document name + IsAnonymous *bool // Filter by anonymity +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (bt *BlockType) UnmarshalJSON(data []byte) error { + type Alias BlockType + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(bt), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + bt.Created = aux.Created.V + bt.Updated = aux.Updated.V + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (bs *BlockSchema) UnmarshalJSON(data []byte) error { + type Alias BlockSchema + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(bs), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + bs.Created = aux.Created.V + bs.Updated = aux.Updated.V + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (bd *BlockDocument) UnmarshalJSON(data []byte) error { + type Alias BlockDocument + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(bd), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + bd.Created = aux.Created.V + bd.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/concurrency_limits.go b/pkg/models/concurrency_limits.go new file mode 100644 index 0000000..62c3bc4 --- /dev/null +++ b/pkg/models/concurrency_limits.go @@ -0,0 +1,137 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// ConcurrencyLimit represents a Prefect v1 concurrency limit. +type ConcurrencyLimit struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Tag string `json:"tag"` + ConcurrencyLimit int `json:"concurrency_limit"` + ActiveSlots []string `json:"active_slots,omitempty"` +} + +// ConcurrencyLimitCreate represents the request to create a v1 concurrency limit. +type ConcurrencyLimitCreate struct { + Tag string `json:"tag"` + ConcurrencyLimit int `json:"concurrency_limit"` +} + +// ConcurrencyLimitV2 represents a Prefect v2 concurrency limit. +type ConcurrencyLimitV2 struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Active bool `json:"active"` + Name string `json:"name"` + Limit int `json:"limit"` + ActiveSlots int `json:"active_slots"` + DeniedSlots int `json:"denied_slots"` + SlotDecayPerSecond float64 `json:"slot_decay_per_second"` + AvgSlotOccupancySeconds float64 `json:"avg_slot_occupancy_seconds"` +} + +// ConcurrencyLimitV2Create represents the request to create a v2 concurrency limit. +type ConcurrencyLimitV2Create struct { + Active *bool `json:"active,omitempty"` + Name string `json:"name"` + Limit int `json:"limit"` + ActiveSlots *int `json:"active_slots,omitempty"` + DeniedSlots *int `json:"denied_slots,omitempty"` + SlotDecayPerSecond *float64 `json:"slot_decay_per_second,omitempty"` +} + +// ConcurrencyLimitV2Update represents the request to update a v2 concurrency limit. +type ConcurrencyLimitV2Update struct { + Active *bool `json:"active,omitempty"` + Name *string `json:"name,omitempty"` + Limit *int `json:"limit,omitempty"` + ActiveSlots *int `json:"active_slots,omitempty"` + DeniedSlots *int `json:"denied_slots,omitempty"` + SlotDecayPerSecond *float64 `json:"slot_decay_per_second,omitempty"` +} + +// GlobalConcurrencyLimitResponse represents the response for global concurrency limits. +type GlobalConcurrencyLimitResponse struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Active bool `json:"active"` + Name string `json:"name"` + Limit int `json:"limit"` + ActiveSlots int `json:"active_slots"` + SlotDecayPerSecond float64 `json:"slot_decay_per_second"` +} + +// MinimalConcurrencyLimitResponse represents a minimal concurrency limit response. +type MinimalConcurrencyLimitResponse struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` + Limit int `json:"limit"` +} + +// ConcurrencyLimitWithLeaseResponse represents a concurrency limit response with lease info. +type ConcurrencyLimitWithLeaseResponse struct { + LeaseID uuid.UUID `json:"lease_id"` + Limits []MinimalConcurrencyLimitResponse `json:"limits"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (cl *ConcurrencyLimit) UnmarshalJSON(data []byte) error { + type Alias ConcurrencyLimit + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(cl), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + cl.Created = aux.Created.V + cl.Updated = aux.Updated.V + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (cl *ConcurrencyLimitV2) UnmarshalJSON(data []byte) error { + type Alias ConcurrencyLimitV2 + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(cl), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + cl.Created = aux.Created.V + cl.Updated = aux.Updated.V + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (g *GlobalConcurrencyLimitResponse) UnmarshalJSON(data []byte) error { + type Alias GlobalConcurrencyLimitResponse + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(g), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + g.Created = aux.Created.V + g.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/deployments.go b/pkg/models/deployments.go new file mode 100644 index 0000000..59e08db --- /dev/null +++ b/pkg/models/deployments.go @@ -0,0 +1,102 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// DeploymentStatus represents the status of a deployment. +type DeploymentStatus string + +const ( + DeploymentStatusReady DeploymentStatus = "READY" + DeploymentStatusNotReady DeploymentStatus = "NOT_READY" +) + +// CollisionStrategy represents the strategy for handling concurrent flow runs. +type CollisionStrategy string + +const ( + CollisionStrategyEnqueue CollisionStrategy = "ENQUEUE" + CollisionStrategyCancelNew CollisionStrategy = "CANCEL_NEW" +) + +// ConcurrencyOptions represents concurrency configuration for a deployment. +type ConcurrencyOptions struct { + CollisionStrategy CollisionStrategy `json:"collision_strategy"` + GracePeriodSeconds *int `json:"grace_period_seconds,omitempty"` +} + +// Deployment represents a Prefect deployment. +type Deployment struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + FlowID uuid.UUID `json:"flow_id"` + Version *string `json:"version"` + Description *string `json:"description"` + Paused bool `json:"paused"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` + WorkQueueName *string `json:"work_queue_name"` + WorkPoolName *string `json:"work_pool_name"` + Path *string `json:"path"` + Entrypoint *string `json:"entrypoint"` + Status *DeploymentStatus `json:"status"` + EnforceParameterSchema bool `json:"enforce_parameter_schema"` +} + +// DeploymentCreate represents the request to create a deployment. +type DeploymentCreate struct { + Name string `json:"name"` + FlowID uuid.UUID `json:"flow_id"` + Paused bool `json:"paused,omitempty"` + Description *string `json:"description,omitempty"` + Version *string `json:"version,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` + WorkPoolName *string `json:"work_pool_name,omitempty"` + WorkQueueName *string `json:"work_queue_name,omitempty"` + Path *string `json:"path,omitempty"` + Entrypoint *string `json:"entrypoint,omitempty"` + EnforceParameterSchema bool `json:"enforce_parameter_schema,omitempty"` +} + +// DeploymentUpdate represents the request to update a deployment. +type DeploymentUpdate struct { + Paused *bool `json:"paused,omitempty"` + Description *string `json:"description,omitempty"` +} + +// DeploymentFilter represents filter criteria for querying deployments. +type DeploymentFilter struct { + FlowID *uuid.UUID `json:"flow_id,omitempty"` + Name *string `json:"name,omitempty"` + Tags []string `json:"tags,omitempty"` + Paused *bool `json:"paused,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (d *Deployment) UnmarshalJSON(data []byte) error { + type Alias Deployment + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + d.Created = aux.Created.V + d.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/events.go b/pkg/models/events.go new file mode 100644 index 0000000..8d19021 --- /dev/null +++ b/pkg/models/events.go @@ -0,0 +1,58 @@ +package models + +import ( + "time" + + "github.com/google/uuid" +) + +// EventOrder represents sort order for events. +type EventOrder string + +const ( + EventOrderAsc EventOrder = "ASC" + EventOrderDesc EventOrder = "DESC" +) + +// Resource represents an observable business object. +type Resource map[string]string + +// RelatedResource represents a resource with a specific role in an event. +type RelatedResource map[string]string + +// Event represents a Prefect event. +type Event struct { + Occurred time.Time `json:"occurred"` + Event string `json:"event"` + Resource Resource `json:"resource"` + Related []RelatedResource `json:"related,omitempty"` + ID uuid.UUID `json:"id"` + Payload map[string]interface{} `json:"payload,omitempty"` + Received *time.Time `json:"received,omitempty"` +} + +// EventFilter represents filter criteria for querying events. +type EventFilter struct { + Occurred interface{} `json:"occurred,omitempty"` + Event interface{} `json:"event,omitempty"` + Resource interface{} `json:"resource,omitempty"` + Related interface{} `json:"related,omitempty"` + ID interface{} `json:"id,omitempty"` + Order EventOrder `json:"order,omitempty"` +} + +// EventPage represents a page of events. +type EventPage struct { + Events []Event `json:"events"` + Total int `json:"total"` + NextPage *string `json:"next_page"` +} + +// EventCount represents a count of events for a given filter value. +type EventCount struct { + Value string `json:"value"` + Label string `json:"label"` + Count int `json:"count"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` +} diff --git a/pkg/models/flow_runs.go b/pkg/models/flow_runs.go new file mode 100644 index 0000000..d18c21d --- /dev/null +++ b/pkg/models/flow_runs.go @@ -0,0 +1,110 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// FlowRun represents a Prefect flow run. +type FlowRun struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + FlowID uuid.UUID `json:"flow_id"` + Name string `json:"name"` + StateID *uuid.UUID `json:"state_id"` + DeploymentID *uuid.UUID `json:"deployment_id"` + WorkQueueID *uuid.UUID `json:"work_queue_id"` + WorkQueueName *string `json:"work_queue_name"` + FlowVersion *string `json:"flow_version"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + IdempotencyKey *string `json:"idempotency_key"` + Context map[string]interface{} `json:"context,omitempty"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` + ParentTaskRunID *uuid.UUID `json:"parent_task_run_id"` + StateType *StateType `json:"state_type"` + StateName *string `json:"state_name"` + RunCount int `json:"run_count"` + ExpectedStartTime *time.Time `json:"expected_start_time"` + NextScheduledStartTime *time.Time `json:"next_scheduled_start_time"` + StartTime *time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time"` + TotalRunTime float64 `json:"total_run_time"` + State *State `json:"state"` +} + +// FlowRunCreate represents the request to create a flow run. +type FlowRunCreate struct { + FlowID uuid.UUID `json:"flow_id"` + Name string `json:"name,omitempty"` + State *StateCreate `json:"state,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Context map[string]interface{} `json:"context,omitempty"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` + IdempotencyKey *string `json:"idempotency_key,omitempty"` + WorkPoolName *string `json:"work_pool_name,omitempty"` + WorkQueueName *string `json:"work_queue_name,omitempty"` + DeploymentID *uuid.UUID `json:"deployment_id,omitempty"` +} + +// FlowRunUpdate represents the request to update a flow run. +type FlowRunUpdate struct { + Name *string `json:"name,omitempty"` +} + +// FlowRunFilter represents filter criteria for querying flow runs. +type FlowRunFilter struct { + FlowID *uuid.UUID `json:"flow_id,omitempty"` + DeploymentID *uuid.UUID `json:"deployment_id,omitempty"` + StateType *StateType `json:"state_type,omitempty"` + Tags []string `json:"tags,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// FlowRunPolicy represents a flow run's retry policy. +type FlowRunPolicy struct { + MaxRetries int `json:"max_retries,omitempty"` + RetryDelaySeconds float64 `json:"retry_delay_seconds,omitempty"` + Retries *int `json:"retries,omitempty"` + RetryDelay *int `json:"retry_delay,omitempty"` + PauseKeys []string `json:"pause_keys,omitempty"` + Resuming *bool `json:"resuming,omitempty"` +} + +// CreatedBy represents information about who created an object. +type CreatedBy struct { + ID *uuid.UUID `json:"id,omitempty"` + Type *string `json:"type,omitempty"` + DisplayValue *string `json:"display_value,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (fr *FlowRun) UnmarshalJSON(data []byte) error { + type Alias FlowRun + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + ExpectedStartTime optTime `json:"expected_start_time"` + NextScheduledStartTime optTime `json:"next_scheduled_start_time"` + StartTime optTime `json:"start_time"` + EndTime optTime `json:"end_time"` + *Alias + }{ + Alias: (*Alias)(fr), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + fr.Created = aux.Created.V + fr.Updated = aux.Updated.V + fr.ExpectedStartTime = aux.ExpectedStartTime.V + fr.NextScheduledStartTime = aux.NextScheduledStartTime.V + fr.StartTime = aux.StartTime.V + fr.EndTime = aux.EndTime.V + return nil +} diff --git a/pkg/models/flows.go b/pkg/models/flows.go new file mode 100644 index 0000000..629837c --- /dev/null +++ b/pkg/models/flows.go @@ -0,0 +1,57 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// Flow represents a Prefect flow. +type Flow struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` +} + +// FlowCreate represents the request to create a flow. +type FlowCreate struct { + Name string `json:"name"` + Tags []string `json:"tags,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` +} + +// FlowUpdate represents the request to update a flow. +type FlowUpdate struct { + Tags *[]string `json:"tags,omitempty"` + Labels *map[string]interface{} `json:"labels,omitempty"` +} + +// FlowFilter represents filter criteria for querying flows. +type FlowFilter struct { + Name *string `json:"name,omitempty"` + Tags []string `json:"tags,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (f *Flow) UnmarshalJSON(data []byte) error { + type Alias Flow + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(f), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + f.Created = aux.Created.V + f.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/logs.go b/pkg/models/logs.go new file mode 100644 index 0000000..363e3bb --- /dev/null +++ b/pkg/models/logs.go @@ -0,0 +1,49 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// Log represents a Prefect log entry. +type Log struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Level int `json:"level"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + FlowRunID *uuid.UUID `json:"flow_run_id"` + TaskRunID *uuid.UUID `json:"task_run_id"` +} + +// LogCreate represents the request to create a log. +type LogCreate struct { + Name string `json:"name"` + Level int `json:"level"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + TaskRunID *uuid.UUID `json:"task_run_id,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (l *Log) UnmarshalJSON(data []byte) error { + type Alias Log + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(l), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + l.Created = aux.Created.V + l.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/saved_searches.go b/pkg/models/saved_searches.go new file mode 100644 index 0000000..f9be2c3 --- /dev/null +++ b/pkg/models/saved_searches.go @@ -0,0 +1,50 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// SavedSearch represents a Prefect saved search. +type SavedSearch struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Filters []SavedSearchFilter `json:"filters"` +} + +// SavedSearchCreate represents the request to create a saved search. +type SavedSearchCreate struct { + Name string `json:"name"` + Filters []SavedSearchFilter `json:"filters,omitempty"` +} + +// SavedSearchFilter represents a filter definition within a saved search. +type SavedSearchFilter struct { + Object string `json:"object"` + Property string `json:"property"` + Type string `json:"type"` + Operation string `json:"operation"` + Value interface{} `json:"value"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (ss *SavedSearch) UnmarshalJSON(data []byte) error { + type Alias SavedSearch + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(ss), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + ss.Created = aux.Created.V + ss.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/schedules.go b/pkg/models/schedules.go new file mode 100644 index 0000000..200377a --- /dev/null +++ b/pkg/models/schedules.go @@ -0,0 +1,77 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// DeploymentSchedule represents a schedule for a deployment. +type DeploymentSchedule struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + DeploymentID *uuid.UUID `json:"deployment_id"` + Schedule json.RawMessage `json:"schedule"` + Active bool `json:"active"` + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Slug *string `json:"slug,omitempty"` +} + +// DeploymentScheduleCreate represents the request to create a deployment schedule. +type DeploymentScheduleCreate struct { + Active *bool `json:"active,omitempty"` + Schedule json.RawMessage `json:"schedule"` + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Slug *string `json:"slug,omitempty"` +} + +// DeploymentScheduleUpdate represents the request to update a deployment schedule. +type DeploymentScheduleUpdate struct { + Active *bool `json:"active,omitempty"` + Schedule json.RawMessage `json:"schedule,omitempty"` + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + Slug *string `json:"slug,omitempty"` +} + +// IntervalSchedule represents an interval-based schedule. +type IntervalSchedule struct { + Interval float64 `json:"interval"` + AnchorDate *string `json:"anchor_date,omitempty"` + Timezone *string `json:"timezone,omitempty"` +} + +// CronSchedule represents a cron-based schedule. +type CronSchedule struct { + Cron string `json:"cron"` + Timezone *string `json:"timezone,omitempty"` + DayOr *bool `json:"day_or,omitempty"` +} + +// RRuleSchedule represents an RRule-based schedule. +type RRuleSchedule struct { + RRule string `json:"rrule"` + Timezone *string `json:"timezone,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (ds *DeploymentSchedule) UnmarshalJSON(data []byte) error { + type Alias DeploymentSchedule + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(ds), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + ds.Created = aux.Created.V + ds.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/states.go b/pkg/models/states.go new file mode 100644 index 0000000..a2668b3 --- /dev/null +++ b/pkg/models/states.go @@ -0,0 +1,55 @@ +package models + +import ( + "time" + + "github.com/google/uuid" +) + +// StateType represents the type of a flow or task run state. +type StateType string + +const ( + StateTypePending StateType = "PENDING" + StateTypeRunning StateType = "RUNNING" + StateTypeCompleted StateType = "COMPLETED" + StateTypeFailed StateType = "FAILED" + StateTypeCancelled StateType = "CANCELLED" + StateTypeCrashed StateType = "CRASHED" + StateTypePaused StateType = "PAUSED" + StateTypeScheduled StateType = "SCHEDULED" + StateTypeCancelling StateType = "CANCELLING" +) + +// State represents the state of a flow or task run. +type State struct { + ID uuid.UUID `json:"id"` + Type StateType `json:"type"` + Name *string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Message *string `json:"message"` + Data interface{} `json:"data,omitempty"` + StateDetails map[string]interface{} `json:"state_details,omitempty"` +} + +// StateCreate represents the request to create a state. +type StateCreate struct { + Type StateType `json:"type"` + Name *string `json:"name,omitempty"` + Message *string `json:"message,omitempty"` + Data interface{} `json:"data,omitempty"` + StateDetails map[string]interface{} `json:"state_details,omitempty"` +} + +// StateDetails represents detailed information about a state. +type StateDetails struct { + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + TaskRunID *uuid.UUID `json:"task_run_id,omitempty"` + ChildFlowRunID *uuid.UUID `json:"child_flow_run_id,omitempty"` + ScheduledTime *time.Time `json:"scheduled_time,omitempty"` + CacheKey *string `json:"cache_key,omitempty"` + CacheExpiration *time.Time `json:"cache_expiration,omitempty"` + Deferred *bool `json:"deferred,omitempty"` + UntrackableResult bool `json:"untrackable_result,omitempty"` + PauseTimeout *time.Time `json:"pause_timeout,omitempty"` +} diff --git a/pkg/models/task_runs.go b/pkg/models/task_runs.go new file mode 100644 index 0000000..0618646 --- /dev/null +++ b/pkg/models/task_runs.go @@ -0,0 +1,83 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// TaskRun represents a Prefect task run. +type TaskRun struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + FlowRunID uuid.UUID `json:"flow_run_id"` + TaskKey string `json:"task_key"` + DynamicKey string `json:"dynamic_key"` + CacheKey *string `json:"cache_key"` + StartTime *time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time"` + TotalRunTime float64 `json:"total_run_time"` + Status *StateType `json:"status"` + StateID *uuid.UUID `json:"state_id"` + Tags []string `json:"tags,omitempty"` + State *State `json:"state"` +} + +// TaskRunCreate represents the request to create a task run. +type TaskRunCreate struct { + Name string `json:"name"` + FlowRunID uuid.UUID `json:"flow_run_id"` + TaskKey string `json:"task_key"` + DynamicKey string `json:"dynamic_key"` + CacheKey *string `json:"cache_key,omitempty"` + State *StateCreate `json:"state,omitempty"` + Tags []string `json:"tags,omitempty"` +} + +// TaskRunUpdate represents the request to update a task run. +type TaskRunUpdate struct { + Name *string `json:"name,omitempty"` +} + +// TaskRunFilter represents filter criteria for querying task runs. +type TaskRunFilter struct { + FlowRunID *uuid.UUID `json:"flow_run_id,omitempty"` + StateType *StateType `json:"state_type,omitempty"` + Tags []string `json:"tags,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// TaskRunPolicy represents a task run's retry policy. +type TaskRunPolicy struct { + MaxRetries int `json:"max_retries,omitempty"` + RetryDelaySeconds float64 `json:"retry_delay_seconds,omitempty"` + Retries *int `json:"retries,omitempty"` + RetryDelay interface{} `json:"retry_delay,omitempty"` + RetryJitterFactor *float64 `json:"retry_jitter_factor,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (tr *TaskRun) UnmarshalJSON(data []byte) error { + type Alias TaskRun + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + StartTime optTime `json:"start_time"` + EndTime optTime `json:"end_time"` + *Alias + }{ + Alias: (*Alias)(tr), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + tr.Created = aux.Created.V + tr.Updated = aux.Updated.V + tr.StartTime = aux.StartTime.V + tr.EndTime = aux.EndTime.V + return nil +} diff --git a/pkg/models/task_workers.go b/pkg/models/task_workers.go new file mode 100644 index 0000000..beac759 --- /dev/null +++ b/pkg/models/task_workers.go @@ -0,0 +1,15 @@ +package models + +import "time" + +// TaskWorkerFilter represents filter criteria for querying task workers. +type TaskWorkerFilter struct { + TaskKeys []string `json:"task_keys"` +} + +// TaskWorkerResponse represents a task worker response. +type TaskWorkerResponse struct { + Identifier string `json:"identifier"` + TaskKeys []string `json:"task_keys"` + Timestamp time.Time `json:"timestamp"` +} diff --git a/pkg/models/time.go b/pkg/models/time.go new file mode 100644 index 0000000..e904590 --- /dev/null +++ b/pkg/models/time.go @@ -0,0 +1,45 @@ +package models + +import ( + "encoding/json" + "time" +) + +// parseOptionalTime parses a time string, returning nil for empty strings. +// Supports RFC3339Nano and RFC3339 formats used by the Prefect API. +func parseOptionalTime(s string) (*time.Time, error) { + if s == "" { + return nil, nil + } + t, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + t, err = time.Parse(time.RFC3339, s) + if err != nil { + return nil, err + } + } + return &t, nil +} + +// optTime handles JSON unmarshaling of nullable time fields that may be +// represented as empty strings or JSON null by the Prefect API. +type optTime struct { + V *time.Time +} + +func (o *optTime) UnmarshalJSON(data []byte) error { + if string(data) == "null" { + o.V = nil + return nil + } + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + v, err := parseOptionalTime(s) + if err != nil { + return err + } + o.V = v + return nil +} diff --git a/pkg/models/variables.go b/pkg/models/variables.go new file mode 100644 index 0000000..97618af --- /dev/null +++ b/pkg/models/variables.go @@ -0,0 +1,57 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// Variable represents a Prefect variable. +type Variable struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Value string `json:"value"` + Tags []string `json:"tags,omitempty"` +} + +// VariableCreate represents the request to create a variable. +type VariableCreate struct { + Name string `json:"name"` + Value string `json:"value"` + Tags []string `json:"tags,omitempty"` +} + +// VariableUpdate represents the request to update a variable. +type VariableUpdate struct { + Value *string `json:"value,omitempty"` + Tags []string `json:"tags,omitempty"` +} + +// VariableFilter represents filter criteria for querying variables. +type VariableFilter struct { + Name *string `json:"name,omitempty"` + Tags []string `json:"tags,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (v *Variable) UnmarshalJSON(data []byte) error { + type Alias Variable + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(v), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + v.Created = aux.Created.V + v.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/work_pools.go b/pkg/models/work_pools.go new file mode 100644 index 0000000..ae6d397 --- /dev/null +++ b/pkg/models/work_pools.go @@ -0,0 +1,72 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// WorkPoolStatus represents the status of a work pool. +type WorkPoolStatus string + +const ( + WorkPoolStatusReady WorkPoolStatus = "READY" + WorkPoolStatusNotReady WorkPoolStatus = "NOT_READY" + WorkPoolStatusPaused WorkPoolStatus = "PAUSED" +) + +// WorkPool represents a Prefect work pool. +type WorkPool struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Description *string `json:"description"` + Type string `json:"type"` + IsPaused bool `json:"is_paused"` +} + +// WorkPoolCreate represents the request to create a work pool. +type WorkPoolCreate struct { + Name string `json:"name"` + Description *string `json:"description,omitempty"` + Type string `json:"type"` + BaseJobTemplate map[string]interface{} `json:"base_job_template,omitempty"` + IsPaused bool `json:"is_paused,omitempty"` + ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` +} + +// WorkPoolUpdate represents the request to update a work pool. +type WorkPoolUpdate struct { + Description *string `json:"description,omitempty"` + IsPaused *bool `json:"is_paused,omitempty"` + BaseJobTemplate map[string]interface{} `json:"base_job_template,omitempty"` + ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` +} + +// WorkPoolFilter represents filter criteria for querying work pools. +type WorkPoolFilter struct { + Name *string `json:"name,omitempty"` + Type *string `json:"type,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (wp *WorkPool) UnmarshalJSON(data []byte) error { + type Alias WorkPool + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(wp), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + wp.Created = aux.Created.V + wp.Updated = aux.Updated.V + return nil +} diff --git a/pkg/models/work_queues.go b/pkg/models/work_queues.go new file mode 100644 index 0000000..6e38d03 --- /dev/null +++ b/pkg/models/work_queues.go @@ -0,0 +1,62 @@ +package models + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +// WorkQueue represents a Prefect work queue. +type WorkQueue struct { + ID uuid.UUID `json:"id"` + Created *time.Time `json:"created"` + Updated *time.Time `json:"updated"` + Name string `json:"name"` + Description *string `json:"description"` + IsPaused bool `json:"is_paused"` + Priority int `json:"priority"` +} + +// WorkQueueCreate represents the request to create a work queue. +type WorkQueueCreate struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + IsPaused bool `json:"is_paused,omitempty"` + ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` + Priority *int `json:"priority,omitempty"` +} + +// WorkQueueUpdate represents the request to update a work queue. +type WorkQueueUpdate struct { + Name *string `json:"name,omitempty"` + Description *string `json:"description,omitempty"` + IsPaused *bool `json:"is_paused,omitempty"` + ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` + Priority *int `json:"priority,omitempty"` +} + +// WorkQueueFilter represents filter criteria for querying work queues. +type WorkQueueFilter struct { + Name *string `json:"name,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// UnmarshalJSON implements custom JSON unmarshaling for time fields. +func (wq *WorkQueue) UnmarshalJSON(data []byte) error { + type Alias WorkQueue + aux := &struct { + Created optTime `json:"created"` + Updated optTime `json:"updated"` + *Alias + }{ + Alias: (*Alias)(wq), + } + if err := json.Unmarshal(data, aux); err != nil { + return err + } + wq.Created = aux.Created.V + wq.Updated = aux.Updated.V + return nil +}