Füge Modelle für Flows, FlowRuns, TaskRuns, WorkPools, WorkQueues, Deployments, Variablen, FlowRunStates, Logs, und Blocks samt zugehöriger Unmarshal-Logik und Zeitfeld-Unterstützung hinzu; ergänze Tests für die FlowRunStates-Service-Methoden.

This commit is contained in:
Gregor Schulte
2026-03-27 14:02:32 +01:00
parent 3aff707116
commit 57531a7d95
36 changed files with 3165 additions and 0 deletions

150
pkg/client/artifacts.go Normal file
View File

@@ -0,0 +1,150 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"git.schultes.dev/schultesdev/prefect-go/pkg/pagination"
"github.com/google/uuid"
)
// ArtifactsService handles operations related to artifacts.
type ArtifactsService struct {
client *Client
}
// Create creates a new artifact.
func (s *ArtifactsService) Create(ctx context.Context, req *models.ArtifactCreate) (*models.Artifact, error) {
var artifact models.Artifact
if err := s.client.post(ctx, "/artifacts/", req, &artifact); err != nil {
return nil, fmt.Errorf("failed to create artifact: %w", err)
}
return &artifact, nil
}
// Get retrieves an artifact by ID.
func (s *ArtifactsService) Get(ctx context.Context, id uuid.UUID) (*models.Artifact, error) {
var artifact models.Artifact
path := joinPath("/artifacts", id.String())
if err := s.client.get(ctx, path, &artifact); err != nil {
return nil, fmt.Errorf("failed to get artifact: %w", err)
}
return &artifact, nil
}
// GetLatest retrieves the latest artifact for a given key.
func (s *ArtifactsService) GetLatest(ctx context.Context, key string) (*models.Artifact, error) {
var artifact models.Artifact
path := joinPath("/artifacts", key, "latest")
if err := s.client.get(ctx, path, &artifact); err != nil {
return nil, fmt.Errorf("failed to get latest artifact: %w", err)
}
return &artifact, nil
}
// Update updates an artifact.
func (s *ArtifactsService) Update(ctx context.Context, id uuid.UUID, req *models.ArtifactUpdate) error {
path := joinPath("/artifacts", id.String())
if err := s.client.patch(ctx, path, req, nil); err != nil {
return fmt.Errorf("failed to update artifact: %w", err)
}
return nil
}
// Delete deletes an artifact by ID.
func (s *ArtifactsService) Delete(ctx context.Context, id uuid.UUID) error {
path := joinPath("/artifacts", id.String())
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete artifact: %w", err)
}
return nil
}
// List retrieves a list of artifacts with optional filtering.
func (s *ArtifactsService) List(ctx context.Context, filter *models.ArtifactFilter, offset, limit int) (*pagination.PaginatedResponse[models.Artifact], error) {
if filter == nil {
filter = &models.ArtifactFilter{}
}
filter.Offset = offset
filter.Limit = limit
type response struct {
Results []models.Artifact `json:"results"`
Count int `json:"count"`
}
var resp response
if err := s.client.post(ctx, "/artifacts/filter", filter, &resp); err != nil {
return nil, fmt.Errorf("failed to list artifacts: %w", err)
}
return &pagination.PaginatedResponse[models.Artifact]{
Results: resp.Results,
Count: resp.Count,
Limit: limit,
Offset: offset,
HasMore: offset+len(resp.Results) < resp.Count,
}, nil
}
// ListAll returns an iterator for all artifacts matching the filter.
func (s *ArtifactsService) ListAll(ctx context.Context, filter *models.ArtifactFilter) *pagination.Iterator[models.Artifact] {
fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Artifact], error) {
return s.List(ctx, filter, offset, limit)
}
return pagination.NewIterator(fetchFunc, 100)
}
// Count returns the number of artifacts matching the filter.
func (s *ArtifactsService) Count(ctx context.Context, filter *models.ArtifactFilter) (int, error) {
if filter == nil {
filter = &models.ArtifactFilter{}
}
var count int
if err := s.client.post(ctx, "/artifacts/count", filter, &count); err != nil {
return 0, fmt.Errorf("failed to count artifacts: %w", err)
}
return count, nil
}
// ListLatest retrieves the latest artifact collections with optional filtering.
func (s *ArtifactsService) ListLatest(ctx context.Context, filter *models.ArtifactCollectionFilter, offset, limit int) (*pagination.PaginatedResponse[models.ArtifactCollection], error) {
if filter == nil {
filter = &models.ArtifactCollectionFilter{}
}
filter.Offset = offset
filter.Limit = limit
type response struct {
Results []models.ArtifactCollection `json:"results"`
Count int `json:"count"`
}
var resp response
if err := s.client.post(ctx, "/artifacts/latest/filter", filter, &resp); err != nil {
return nil, fmt.Errorf("failed to list latest artifacts: %w", err)
}
return &pagination.PaginatedResponse[models.ArtifactCollection]{
Results: resp.Results,
Count: resp.Count,
Limit: limit,
Offset: offset,
HasMore: offset+len(resp.Results) < resp.Count,
}, nil
}
// CountLatest returns the number of latest artifact collections matching the filter.
func (s *ArtifactsService) CountLatest(ctx context.Context, filter *models.ArtifactCollectionFilter) (int, error) {
if filter == nil {
filter = &models.ArtifactCollectionFilter{}
}
var count int
if err := s.client.post(ctx, "/artifacts/latest/count", filter, &count); err != nil {
return 0, fmt.Errorf("failed to count latest artifacts: %w", err)
}
return count, nil
}

View File

@@ -0,0 +1,139 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestArtifactsService_Create(t *testing.T) {
expected := models.Artifact{
ID: uuid.New(),
Key: strPtr("test-key"),
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/artifacts/" {
t.Errorf("path = %v, want /api/artifacts/", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
artifact, err := client.Artifacts.Create(ctx, &models.ArtifactCreate{Key: strPtr("test-key")})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if artifact.ID != expected.ID {
t.Errorf("ID = %v, want %v", artifact.ID, expected.ID)
}
}
func TestArtifactsService_Get(t *testing.T) {
id := uuid.New()
expected := models.Artifact{ID: id, Key: strPtr("test-key")}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
artifact, err := client.Artifacts.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if artifact.ID != id {
t.Errorf("ID = %v, want %v", artifact.ID, id)
}
}
func TestArtifactsService_Delete(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %v, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.Artifacts.Delete(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestArtifactsService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
if r.URL.Path != "/api/artifacts/filter" {
t.Errorf("path = %v, want /api/artifacts/filter", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
resp := map[string]interface{}{
"results": []models.Artifact{{ID: uuid.New()}},
"count": 1,
}
json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
result, err := client.Artifacts.List(ctx, nil, 0, 10)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result.Results) != 1 {
t.Errorf("results count = %v, want 1", len(result.Results))
}
}
func TestArtifactsService_Count(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`5`))
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
count, err := client.Artifacts.Count(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if count != 5 {
t.Errorf("count = %v, want 5", count)
}
}
func strPtr(s string) *string {
return &s
}

120
pkg/client/automations.go Normal file
View File

@@ -0,0 +1,120 @@
package client
import (
"context"
"fmt"
"net/http"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// AutomationsService handles operations related to automations.
type AutomationsService struct {
client *Client
}
// Create creates a new automation.
func (s *AutomationsService) Create(ctx context.Context, req *models.AutomationCreate) (*models.Automation, error) {
var automation models.Automation
if err := s.client.do(ctx, http.MethodPost, "/automations/", req, &automation); err != nil {
return nil, fmt.Errorf("failed to create automation: %w", err)
}
return &automation, nil
}
// Get retrieves an automation by ID.
func (s *AutomationsService) Get(ctx context.Context, id uuid.UUID) (*models.Automation, error) {
var automation models.Automation
path := joinPath("/automations", id.String())
if err := s.client.get(ctx, path, &automation); err != nil {
return nil, fmt.Errorf("failed to get automation: %w", err)
}
return &automation, nil
}
// Update fully replaces an automation.
func (s *AutomationsService) Update(ctx context.Context, id uuid.UUID, req *models.AutomationUpdate) (*models.Automation, error) {
var automation models.Automation
path := joinPath("/automations", id.String())
if err := s.client.do(ctx, http.MethodPut, path, req, &automation); err != nil {
return nil, fmt.Errorf("failed to update automation: %w", err)
}
return &automation, nil
}
// Patch partially updates an automation.
func (s *AutomationsService) Patch(ctx context.Context, id uuid.UUID, req *models.AutomationPartialUpdate) error {
path := joinPath("/automations", id.String())
if err := s.client.patch(ctx, path, req, nil); err != nil {
return fmt.Errorf("failed to patch automation: %w", err)
}
return nil
}
// Delete deletes an automation by ID.
func (s *AutomationsService) Delete(ctx context.Context, id uuid.UUID) error {
path := joinPath("/automations", id.String())
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete automation: %w", err)
}
return nil
}
// List retrieves automations with optional filtering.
func (s *AutomationsService) List(ctx context.Context, filter *models.AutomationFilter) ([]models.Automation, error) {
if filter == nil {
filter = &models.AutomationFilter{}
}
var automations []models.Automation
if err := s.client.post(ctx, "/automations/filter", filter, &automations); err != nil {
return nil, fmt.Errorf("failed to list automations: %w", err)
}
return automations, nil
}
// Count returns the number of automations matching the filter.
func (s *AutomationsService) Count(ctx context.Context, filter *models.AutomationFilter) (int, error) {
if filter == nil {
filter = &models.AutomationFilter{}
}
var count int
if err := s.client.post(ctx, "/automations/count", filter, &count); err != nil {
return 0, fmt.Errorf("failed to count automations: %w", err)
}
return count, nil
}
// GetRelatedTo retrieves automations related to a specific resource.
func (s *AutomationsService) GetRelatedTo(ctx context.Context, resourceID string) ([]models.Automation, error) {
var automations []models.Automation
path := joinPath("/automations/related-to", resourceID)
if err := s.client.get(ctx, path, &automations); err != nil {
return nil, fmt.Errorf("failed to get related automations: %w", err)
}
return automations, nil
}
// DeleteOwnedBy deletes automations owned by a specific resource.
func (s *AutomationsService) DeleteOwnedBy(ctx context.Context, resourceID string) error {
path := joinPath("/automations/owned-by", resourceID)
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete owned automations: %w", err)
}
return nil
}
// ValidateTemplate validates an automation template.
func (s *AutomationsService) ValidateTemplate(ctx context.Context, template string) error {
req := struct {
Template string `json:"template"`
}{
Template: template,
}
if err := s.client.post(ctx, "/automations/templates/validate", req, nil); err != nil {
return fmt.Errorf("failed to validate template: %w", err)
}
return nil
}

View File

@@ -0,0 +1,135 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestAutomationsService_Create(t *testing.T) {
expected := models.Automation{
ID: uuid.New(),
Name: "test-automation",
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/automations/" {
t.Errorf("path = %v, want /api/automations/", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
automation, err := client.Automations.Create(ctx, &models.AutomationCreate{
Name: "test-automation",
Trigger: json.RawMessage(`{"type":"event"}`),
Actions: []json.RawMessage{json.RawMessage(`{"type":"do-nothing"}`)},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if automation.Name != expected.Name {
t.Errorf("Name = %v, want %v", automation.Name, expected.Name)
}
}
func TestAutomationsService_Get(t *testing.T) {
id := uuid.New()
expected := models.Automation{ID: id, Name: "test"}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
automation, err := client.Automations.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if automation.ID != id {
t.Errorf("ID = %v, want %v", automation.ID, id)
}
}
func TestAutomationsService_Delete(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %v, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.Automations.Delete(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestAutomationsService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/automations/filter" {
t.Errorf("path = %v, want /api/automations/filter", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.Automation{{ID: uuid.New(), Name: "test"}})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
automations, err := client.Automations.List(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(automations) != 1 {
t.Errorf("count = %v, want 1", len(automations))
}
}
func TestAutomationsService_Patch(t *testing.T) {
id := uuid.New()
enabled := true
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPatch {
t.Errorf("method = %v, want PATCH", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.Automations.Patch(ctx, id, &models.AutomationPartialUpdate{Enabled: &enabled})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@@ -0,0 +1,114 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// ConcurrencyLimitsService handles operations related to v1 concurrency limits.
type ConcurrencyLimitsService struct {
client *Client
}
// Create creates a new concurrency limit.
func (s *ConcurrencyLimitsService) Create(ctx context.Context, req *models.ConcurrencyLimitCreate) (*models.ConcurrencyLimit, error) {
var limit models.ConcurrencyLimit
if err := s.client.post(ctx, "/concurrency_limits/", req, &limit); err != nil {
return nil, fmt.Errorf("failed to create concurrency limit: %w", err)
}
return &limit, nil
}
// Get retrieves a concurrency limit by ID.
func (s *ConcurrencyLimitsService) Get(ctx context.Context, id uuid.UUID) (*models.ConcurrencyLimit, error) {
var limit models.ConcurrencyLimit
path := joinPath("/concurrency_limits", id.String())
if err := s.client.get(ctx, path, &limit); err != nil {
return nil, fmt.Errorf("failed to get concurrency limit: %w", err)
}
return &limit, nil
}
// GetByTag retrieves a concurrency limit by tag.
func (s *ConcurrencyLimitsService) GetByTag(ctx context.Context, tag string) (*models.ConcurrencyLimit, error) {
var limit models.ConcurrencyLimit
path := joinPath("/concurrency_limits/tag", tag)
if err := s.client.get(ctx, path, &limit); err != nil {
return nil, fmt.Errorf("failed to get concurrency limit by tag: %w", err)
}
return &limit, nil
}
// Delete deletes a concurrency limit by ID.
func (s *ConcurrencyLimitsService) Delete(ctx context.Context, id uuid.UUID) error {
path := joinPath("/concurrency_limits", id.String())
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete concurrency limit: %w", err)
}
return nil
}
// DeleteByTag deletes a concurrency limit by tag.
func (s *ConcurrencyLimitsService) DeleteByTag(ctx context.Context, tag string) error {
path := joinPath("/concurrency_limits/tag", tag)
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete concurrency limit by tag: %w", err)
}
return nil
}
// ResetByTag resets a concurrency limit by tag.
func (s *ConcurrencyLimitsService) ResetByTag(ctx context.Context, tag string) error {
path := joinPath("/concurrency_limits/tag", tag, "reset")
if err := s.client.post(ctx, path, nil, nil); err != nil {
return fmt.Errorf("failed to reset concurrency limit: %w", err)
}
return nil
}
// List retrieves concurrency limits with optional filtering.
func (s *ConcurrencyLimitsService) List(ctx context.Context) ([]models.ConcurrencyLimit, error) {
var limits []models.ConcurrencyLimit
if err := s.client.post(ctx, "/concurrency_limits/filter", nil, &limits); err != nil {
return nil, fmt.Errorf("failed to list concurrency limits: %w", err)
}
return limits, nil
}
// Increment increments concurrency limit slots.
func (s *ConcurrencyLimitsService) Increment(ctx context.Context, names []string, slots int, mode string) ([]models.MinimalConcurrencyLimitResponse, error) {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
Mode string `json:"mode,omitempty"`
}{
Names: names,
Slots: slots,
Mode: mode,
}
var resp []models.MinimalConcurrencyLimitResponse
if err := s.client.post(ctx, "/concurrency_limits/increment", req, &resp); err != nil {
return nil, fmt.Errorf("failed to increment concurrency limits: %w", err)
}
return resp, nil
}
// Decrement decrements concurrency limit slots.
func (s *ConcurrencyLimitsService) Decrement(ctx context.Context, names []string, slots int) error {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
}{
Names: names,
Slots: slots,
}
if err := s.client.post(ctx, "/concurrency_limits/decrement", req, nil); err != nil {
return fmt.Errorf("failed to decrement concurrency limits: %w", err)
}
return nil
}

View File

@@ -0,0 +1,126 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestConcurrencyLimitsService_Create(t *testing.T) {
expected := models.ConcurrencyLimit{
ID: uuid.New(),
Tag: "test-tag",
ConcurrencyLimit: 10,
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limit, err := client.ConcurrencyLimits.Create(ctx, &models.ConcurrencyLimitCreate{
Tag: "test-tag",
ConcurrencyLimit: 10,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if limit.Tag != "test-tag" {
t.Errorf("Tag = %v, want test-tag", limit.Tag)
}
}
func TestConcurrencyLimitsService_Get(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.ConcurrencyLimit{ID: id, Tag: "test"})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limit, err := client.ConcurrencyLimits.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if limit.ID != id {
t.Errorf("ID = %v, want %v", limit.ID, id)
}
}
func TestConcurrencyLimitsService_GetByTag(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.ConcurrencyLimit{ID: uuid.New(), Tag: "my-tag"})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limit, err := client.ConcurrencyLimits.GetByTag(ctx, "my-tag")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if limit.Tag != "my-tag" {
t.Errorf("Tag = %v, want my-tag", limit.Tag)
}
}
func TestConcurrencyLimitsService_Delete(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %v, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.ConcurrencyLimits.Delete(ctx, uuid.New())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestConcurrencyLimitsService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.ConcurrencyLimit{{ID: uuid.New(), Tag: "test"}})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limits, err := client.ConcurrencyLimits.List(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(limits) != 1 {
t.Errorf("count = %v, want 1", len(limits))
}
}

View File

@@ -0,0 +1,141 @@
package client
import (
"context"
"fmt"
"net/http"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// ConcurrencyLimitsV2Service handles operations related to v2 concurrency limits.
type ConcurrencyLimitsV2Service struct {
client *Client
}
// Create creates a new v2 concurrency limit.
func (s *ConcurrencyLimitsV2Service) Create(ctx context.Context, req *models.ConcurrencyLimitV2Create) (*models.ConcurrencyLimitV2, error) {
var limit models.ConcurrencyLimitV2
if err := s.client.do(ctx, http.MethodPost, "/v2/concurrency_limits/", req, &limit); err != nil {
return nil, fmt.Errorf("failed to create v2 concurrency limit: %w", err)
}
return &limit, nil
}
// Get retrieves a v2 concurrency limit by ID or name.
func (s *ConcurrencyLimitsV2Service) Get(ctx context.Context, idOrName string) (*models.ConcurrencyLimitV2, error) {
var limit models.ConcurrencyLimitV2
path := joinPath("/v2/concurrency_limits", idOrName)
if err := s.client.get(ctx, path, &limit); err != nil {
return nil, fmt.Errorf("failed to get v2 concurrency limit: %w", err)
}
return &limit, nil
}
// Update updates a v2 concurrency limit.
func (s *ConcurrencyLimitsV2Service) Update(ctx context.Context, idOrName string, req *models.ConcurrencyLimitV2Update) error {
path := joinPath("/v2/concurrency_limits", idOrName)
if err := s.client.patch(ctx, path, req, nil); err != nil {
return fmt.Errorf("failed to update v2 concurrency limit: %w", err)
}
return nil
}
// Delete deletes a v2 concurrency limit by ID or name.
func (s *ConcurrencyLimitsV2Service) Delete(ctx context.Context, idOrName string) error {
path := joinPath("/v2/concurrency_limits", idOrName)
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete v2 concurrency limit: %w", err)
}
return nil
}
// List retrieves all v2 concurrency limits.
func (s *ConcurrencyLimitsV2Service) List(ctx context.Context) ([]models.GlobalConcurrencyLimitResponse, error) {
var limits []models.GlobalConcurrencyLimitResponse
if err := s.client.post(ctx, "/v2/concurrency_limits/filter", nil, &limits); err != nil {
return nil, fmt.Errorf("failed to list v2 concurrency limits: %w", err)
}
return limits, nil
}
// Increment increments active slots for v2 concurrency limits.
func (s *ConcurrencyLimitsV2Service) Increment(ctx context.Context, names []string, slots int, mode string) error {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
Mode string `json:"mode,omitempty"`
}{
Names: names,
Slots: slots,
Mode: mode,
}
if err := s.client.post(ctx, "/v2/concurrency_limits/increment", req, nil); err != nil {
return fmt.Errorf("failed to increment v2 concurrency limits: %w", err)
}
return nil
}
// IncrementWithLease increments active slots and returns a lease.
func (s *ConcurrencyLimitsV2Service) IncrementWithLease(ctx context.Context, names []string, slots int, mode string) (*models.ConcurrencyLimitWithLeaseResponse, error) {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
Mode string `json:"mode,omitempty"`
}{
Names: names,
Slots: slots,
Mode: mode,
}
var resp models.ConcurrencyLimitWithLeaseResponse
if err := s.client.post(ctx, "/v2/concurrency_limits/increment-with-lease", req, &resp); err != nil {
return nil, fmt.Errorf("failed to increment v2 concurrency limits with lease: %w", err)
}
return &resp, nil
}
// Decrement decrements active slots for v2 concurrency limits.
func (s *ConcurrencyLimitsV2Service) Decrement(ctx context.Context, names []string, slots int) error {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
}{
Names: names,
Slots: slots,
}
if err := s.client.post(ctx, "/v2/concurrency_limits/decrement", req, nil); err != nil {
return fmt.Errorf("failed to decrement v2 concurrency limits: %w", err)
}
return nil
}
// DecrementWithLease decrements active slots using a lease ID.
func (s *ConcurrencyLimitsV2Service) DecrementWithLease(ctx context.Context, names []string, slots int, leaseID uuid.UUID) error {
req := struct {
Names []string `json:"names"`
Slots int `json:"slots"`
LeaseID uuid.UUID `json:"lease_id"`
}{
Names: names,
Slots: slots,
LeaseID: leaseID,
}
if err := s.client.post(ctx, "/v2/concurrency_limits/decrement-with-lease", req, nil); err != nil {
return fmt.Errorf("failed to decrement v2 concurrency limits with lease: %w", err)
}
return nil
}
// RenewLease renews a concurrency lease.
func (s *ConcurrencyLimitsV2Service) RenewLease(ctx context.Context, leaseID uuid.UUID) error {
path := joinPath("/v2/concurrency_limits/leases", leaseID.String(), "renew")
if err := s.client.post(ctx, path, nil, nil); err != nil {
return fmt.Errorf("failed to renew concurrency lease: %w", err)
}
return nil
}

View File

@@ -0,0 +1,128 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestConcurrencyLimitsV2Service_Create(t *testing.T) {
expected := models.ConcurrencyLimitV2{
ID: uuid.New(),
Name: "test-limit",
Limit: 5,
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v2/concurrency_limits/" {
t.Errorf("path = %v, want /api/v2/concurrency_limits/", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limit, err := client.ConcurrencyLimitsV2.Create(ctx, &models.ConcurrencyLimitV2Create{
Name: "test-limit",
Limit: 5,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if limit.Name != "test-limit" {
t.Errorf("Name = %v, want test-limit", limit.Name)
}
}
func TestConcurrencyLimitsV2Service_Get(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.ConcurrencyLimitV2{ID: uuid.New(), Name: "test"})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limit, err := client.ConcurrencyLimitsV2.Get(ctx, "test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if limit.Name != "test" {
t.Errorf("Name = %v, want test", limit.Name)
}
}
func TestConcurrencyLimitsV2Service_Delete(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %v, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.ConcurrencyLimitsV2.Delete(ctx, "test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestConcurrencyLimitsV2Service_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.GlobalConcurrencyLimitResponse{
{ID: uuid.New(), Name: "test", Limit: 5, ActiveSlots: 0},
})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
limits, err := client.ConcurrencyLimitsV2.List(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(limits) != 1 {
t.Errorf("count = %v, want 1", len(limits))
}
}
func TestConcurrencyLimitsV2Service_RenewLease(t *testing.T) {
leaseID := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.ConcurrencyLimitsV2.RenewLease(ctx, leaseID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

57
pkg/client/events.go Normal file
View File

@@ -0,0 +1,57 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
)
// EventsService handles operations related to events.
type EventsService struct {
client *Client
}
// Create creates new events.
func (s *EventsService) Create(ctx context.Context, events []models.Event) error {
if err := s.client.post(ctx, "/events", events, nil); err != nil {
return fmt.Errorf("failed to create events: %w", err)
}
return nil
}
// List retrieves events with filtering.
func (s *EventsService) List(ctx context.Context, filter *models.EventFilter) (*models.EventPage, error) {
if filter == nil {
filter = &models.EventFilter{}
}
var page models.EventPage
if err := s.client.post(ctx, "/events/filter", filter, &page); err != nil {
return nil, fmt.Errorf("failed to list events: %w", err)
}
return &page, nil
}
// NextPage retrieves the next page of events using a page token.
func (s *EventsService) NextPage(ctx context.Context) (*models.EventPage, error) {
var page models.EventPage
if err := s.client.get(ctx, "/events/filter/next", &page); err != nil {
return nil, fmt.Errorf("failed to get next events page: %w", err)
}
return &page, nil
}
// CountBy counts events grouped by a countable field.
func (s *EventsService) CountBy(ctx context.Context, countable string, filter *models.EventFilter) ([]models.EventCount, error) {
if filter == nil {
filter = &models.EventFilter{}
}
path := joinPath("/events/count-by", countable)
var counts []models.EventCount
if err := s.client.post(ctx, path, filter, &counts); err != nil {
return nil, fmt.Errorf("failed to count events: %w", err)
}
return counts, nil
}

62
pkg/client/events_test.go Normal file
View File

@@ -0,0 +1,62 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestEventsService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/events/filter" {
t.Errorf("path = %v, want /api/events/filter", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.EventPage{
Events: []models.Event{{ID: uuid.New(), Event: "test.event"}},
Total: 1,
})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
page, err := client.Events.List(ctx, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if page.Total != 1 {
t.Errorf("total = %v, want 1", page.Total)
}
}
func TestEventsService_CountBy(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.EventCount{{Value: "test", Count: 5}})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
counts, err := client.Events.CountBy(ctx, "event", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(counts) != 1 {
t.Errorf("count = %v, want 1", len(counts))
}
}

View File

@@ -0,0 +1,33 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// FlowRunStatesService handles operations related to flow run states.
type FlowRunStatesService struct {
client *Client
}
// List retrieves all flow run states.
func (s *FlowRunStatesService) List(ctx context.Context) ([]models.State, error) {
var states []models.State
if err := s.client.get(ctx, "/flow_run_states/", &states); err != nil {
return nil, fmt.Errorf("failed to list flow run states: %w", err)
}
return states, nil
}
// Get retrieves a flow run state by ID.
func (s *FlowRunStatesService) Get(ctx context.Context, id uuid.UUID) (*models.State, error) {
var state models.State
path := joinPath("/flow_run_states", id.String())
if err := s.client.get(ctx, path, &state); err != nil {
return nil, fmt.Errorf("failed to get flow run state: %w", err)
}
return &state, nil
}

View File

@@ -0,0 +1,63 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestFlowRunStatesService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/flow_run_states/" {
t.Errorf("path = %v, want /api/flow_run_states/", r.URL.Path)
}
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.State{
{ID: uuid.New(), Type: models.StateTypeCompleted},
})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
states, err := client.FlowRunStates.List(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(states) != 1 {
t.Errorf("count = %v, want 1", len(states))
}
}
func TestFlowRunStatesService_Get(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.State{ID: id, Type: models.StateTypeRunning})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
state, err := client.FlowRunStates.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if state.ID != id {
t.Errorf("ID = %v, want %v", state.ID, id)
}
}

View File

@@ -0,0 +1,52 @@
package client
import (
"context"
"fmt"
"net/http"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// SavedSearchesService handles operations related to saved searches.
type SavedSearchesService struct {
client *Client
}
// Create creates a new saved search.
func (s *SavedSearchesService) Create(ctx context.Context, req *models.SavedSearchCreate) (*models.SavedSearch, error) {
var savedSearch models.SavedSearch
if err := s.client.do(ctx, http.MethodPut, "/saved_searches/", req, &savedSearch); err != nil {
return nil, fmt.Errorf("failed to create saved search: %w", err)
}
return &savedSearch, nil
}
// Get retrieves a saved search by ID.
func (s *SavedSearchesService) Get(ctx context.Context, id uuid.UUID) (*models.SavedSearch, error) {
var savedSearch models.SavedSearch
path := joinPath("/saved_searches", id.String())
if err := s.client.get(ctx, path, &savedSearch); err != nil {
return nil, fmt.Errorf("failed to get saved search: %w", err)
}
return &savedSearch, nil
}
// Delete deletes a saved search by ID.
func (s *SavedSearchesService) Delete(ctx context.Context, id uuid.UUID) error {
path := joinPath("/saved_searches", id.String())
if err := s.client.delete(ctx, path); err != nil {
return fmt.Errorf("failed to delete saved search: %w", err)
}
return nil
}
// List retrieves all saved searches.
func (s *SavedSearchesService) List(ctx context.Context) ([]models.SavedSearch, error) {
var savedSearches []models.SavedSearch
if err := s.client.post(ctx, "/saved_searches/filter", nil, &savedSearches); err != nil {
return nil, fmt.Errorf("failed to list saved searches: %w", err)
}
return savedSearches, nil
}

View File

@@ -0,0 +1,103 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestSavedSearchesService_Create(t *testing.T) {
expected := models.SavedSearch{ID: uuid.New(), Name: "test-search"}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
t.Errorf("method = %v, want PUT", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
search, err := client.SavedSearches.Create(ctx, &models.SavedSearchCreate{Name: "test-search"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if search.Name != expected.Name {
t.Errorf("Name = %v, want %v", search.Name, expected.Name)
}
}
func TestSavedSearchesService_Get(t *testing.T) {
id := uuid.New()
expected := models.SavedSearch{ID: id, Name: "test"}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(expected)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
search, err := client.SavedSearches.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if search.ID != id {
t.Errorf("ID = %v, want %v", search.ID, id)
}
}
func TestSavedSearchesService_Delete(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %v, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
err := client.SavedSearches.Delete(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestSavedSearchesService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/saved_searches/filter" {
t.Errorf("path = %v, want /api/saved_searches/filter", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.SavedSearch{{ID: uuid.New(), Name: "test"}})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
searches, err := client.SavedSearches.List(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(searches) != 1 {
t.Errorf("count = %v, want 1", len(searches))
}
}

View File

@@ -0,0 +1,33 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
// TaskRunStatesService handles operations related to task run states.
type TaskRunStatesService struct {
client *Client
}
// List retrieves all task run states.
func (s *TaskRunStatesService) List(ctx context.Context) ([]models.State, error) {
var states []models.State
if err := s.client.get(ctx, "/task_run_states/", &states); err != nil {
return nil, fmt.Errorf("failed to list task run states: %w", err)
}
return states, nil
}
// Get retrieves a task run state by ID.
func (s *TaskRunStatesService) Get(ctx context.Context, id uuid.UUID) (*models.State, error) {
var state models.State
path := joinPath("/task_run_states", id.String())
if err := s.client.get(ctx, path, &state); err != nil {
return nil, fmt.Errorf("failed to get task run state: %w", err)
}
return &state, nil
}

View File

@@ -0,0 +1,63 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
"github.com/google/uuid"
)
func TestTaskRunStatesService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/task_run_states/" {
t.Errorf("path = %v, want /api/task_run_states/", r.URL.Path)
}
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.State{
{ID: uuid.New(), Type: models.StateTypeFailed},
})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
states, err := client.TaskRunStates.List(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(states) != 1 {
t.Errorf("count = %v, want 1", len(states))
}
}
func TestTaskRunStatesService_Get(t *testing.T) {
id := uuid.New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("method = %v, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(models.State{ID: id, Type: models.StateTypePending})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
state, err := client.TaskRunStates.Get(ctx, id)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if state.ID != id {
t.Errorf("ID = %v, want %v", state.ID, id)
}
}

View File

@@ -0,0 +1,26 @@
package client
import (
"context"
"fmt"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
)
// TaskWorkersService handles operations related to task workers.
type TaskWorkersService struct {
client *Client
}
// List retrieves task workers matching the filter.
func (s *TaskWorkersService) List(ctx context.Context, filter *models.TaskWorkerFilter) ([]models.TaskWorkerResponse, error) {
if filter == nil {
filter = &models.TaskWorkerFilter{}
}
var workers []models.TaskWorkerResponse
if err := s.client.post(ctx, "/task_workers/filter", filter, &workers); err != nil {
return nil, fmt.Errorf("failed to list task workers: %w", err)
}
return workers, nil
}

View File

@@ -0,0 +1,41 @@
package client
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
)
func TestTaskWorkersService_List(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/task_workers/filter" {
t.Errorf("path = %v, want /api/task_workers/filter", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("method = %v, want POST", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]models.TaskWorkerResponse{
{Identifier: "worker-1", TaskKeys: []string{"key1"}},
})
}))
defer server.Close()
client, _ := NewClient(WithBaseURL(server.URL + "/api"))
ctx := context.Background()
workers, err := client.TaskWorkers.List(ctx, &models.TaskWorkerFilter{TaskKeys: []string{"key1"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(workers) != 1 {
t.Errorf("count = %v, want 1", len(workers))
}
if workers[0].Identifier != "worker-1" {
t.Errorf("Identifier = %v, want worker-1", workers[0].Identifier)
}
}