From 2dcc0b13dd03ad099c5bed39a938b877d1b0d658 Mon Sep 17 00:00:00 2001 From: Gregor Schulte Date: Wed, 8 Apr 2026 16:07:44 +0200 Subject: [PATCH] =?UTF-8?q?Extrahiere=20Deployments-,=20TaskRun-=20und=20W?= =?UTF-8?q?orkPool-Services=20in=20separate=20Dateien;=20entferne=20nicht?= =?UTF-8?q?=20ben=C3=B6tigte=20Services=20und=20Funktionen.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/client/admin.go | 30 ++ pkg/client/deployments.go | 184 ++++++++++++ pkg/client/logs.go | 55 ++++ pkg/client/services.go | 600 -------------------------------------- pkg/client/task_runs.go | 118 ++++++++ pkg/client/variables.go | 111 +++++++ pkg/client/work_pools.go | 78 +++++ pkg/client/work_queues.go | 76 +++++ 8 files changed, 652 insertions(+), 600 deletions(-) create mode 100644 pkg/client/admin.go create mode 100644 pkg/client/deployments.go create mode 100644 pkg/client/logs.go delete mode 100644 pkg/client/services.go create mode 100644 pkg/client/task_runs.go create mode 100644 pkg/client/variables.go create mode 100644 pkg/client/work_pools.go create mode 100644 pkg/client/work_queues.go diff --git a/pkg/client/admin.go b/pkg/client/admin.go new file mode 100644 index 0000000..a5cabeb --- /dev/null +++ b/pkg/client/admin.go @@ -0,0 +1,30 @@ +package client + +import ( + "context" + "fmt" +) + +// AdminService handles administrative operations. +type AdminService struct { + client *Client +} + +// Health checks the health of the Prefect server. +func (a *AdminService) Health(ctx context.Context) error { + if err := a.client.get(ctx, "/health", nil); err != nil { + return fmt.Errorf("health check failed: %w", err) + } + return nil +} + +// Version retrieves the server version. +func (a *AdminService) Version(ctx context.Context) (string, error) { + var result struct { + Version string `json:"version"` + } + if err := a.client.get(ctx, "/version", &result); err != nil { + return "", fmt.Errorf("failed to get version: %w", err) + } + return result.Version, nil +} diff --git a/pkg/client/deployments.go b/pkg/client/deployments.go new file mode 100644 index 0000000..5fdb891 --- /dev/null +++ b/pkg/client/deployments.go @@ -0,0 +1,184 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" + "github.com/google/uuid" +) + +// DeploymentsService handles operations related to deployments. +type DeploymentsService struct { + client *Client +} + +// Create creates a new deployment. +func (s *DeploymentsService) Create(ctx context.Context, req *models.DeploymentCreate) (*models.Deployment, error) { + var deployment models.Deployment + if err := s.client.post(ctx, "/deployments/", req, &deployment); err != nil { + return nil, fmt.Errorf("failed to create deployment: %w", err) + } + return &deployment, nil +} + +// Get retrieves a deployment by ID. +func (s *DeploymentsService) Get(ctx context.Context, id uuid.UUID) (*models.Deployment, error) { + var deployment models.Deployment + path := joinPath("/deployments", id.String()) + if err := s.client.get(ctx, path, &deployment); err != nil { + return nil, fmt.Errorf("failed to get deployment: %w", err) + } + return &deployment, nil +} + +// GetByName retrieves a deployment by flow name and deployment name. +func (s *DeploymentsService) GetByName(ctx context.Context, flowName, deploymentName string) (*models.Deployment, error) { + var deployment models.Deployment + path := fmt.Sprintf("/deployments/name/%s/%s", flowName, deploymentName) + if err := s.client.get(ctx, path, &deployment); err != nil { + return nil, fmt.Errorf("failed to get deployment by name: %w", err) + } + return &deployment, nil +} + +// Update updates a deployment. +func (s *DeploymentsService) Update(ctx context.Context, id uuid.UUID, req *models.DeploymentUpdate) (*models.Deployment, error) { + var deployment models.Deployment + path := joinPath("/deployments", id.String()) + if err := s.client.patch(ctx, path, req, &deployment); err != nil { + return nil, fmt.Errorf("failed to update deployment: %w", err) + } + return &deployment, nil +} + +// Delete deletes a deployment by ID. +func (s *DeploymentsService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/deployments", id.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + return nil +} + +// Pause pauses a deployment's schedule. +func (s *DeploymentsService) Pause(ctx context.Context, id uuid.UUID) error { + path := joinPath("/deployments", id.String(), "pause") + if err := s.client.post(ctx, path, nil, nil); err != nil { + return fmt.Errorf("failed to pause deployment: %w", err) + } + return nil +} + +// Resume resumes a deployment's schedule. +func (s *DeploymentsService) Resume(ctx context.Context, id uuid.UUID) error { + path := joinPath("/deployments", id.String(), "resume") + if err := s.client.post(ctx, path, nil, nil); err != nil { + return fmt.Errorf("failed to resume deployment: %w", err) + } + return nil +} + +// CreateFlowRun creates a flow run from a deployment. +func (s *DeploymentsService) CreateFlowRun(ctx context.Context, id uuid.UUID, params map[string]interface{}) (*models.FlowRun, error) { + var flowRun models.FlowRun + path := joinPath("/deployments", id.String(), "create_flow_run") + + req := struct { + Parameters map[string]interface{} `json:"parameters,omitempty"` + }{ + Parameters: params, + } + + if err := s.client.post(ctx, path, req, &flowRun); err != nil { + return nil, fmt.Errorf("failed to create flow run from deployment: %w", err) + } + return &flowRun, nil +} + +// List retrieves a list of deployments with optional filtering. +func (s *DeploymentsService) List(ctx context.Context, filter *models.DeploymentFilter, offset, limit int) (*pagination.PaginatedResponse[models.Deployment], error) { + if filter == nil { + filter = &models.DeploymentFilter{} + } + filter.Offset = offset + filter.Limit = limit + + type response struct { + Results []models.Deployment `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := s.client.post(ctx, "/deployments/filter", filter, &resp); err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + return &pagination.PaginatedResponse[models.Deployment]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} + +// ListAll returns an iterator for all deployments matching the filter. +func (s *DeploymentsService) ListAll(ctx context.Context, filter *models.DeploymentFilter) *pagination.Iterator[models.Deployment] { + fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Deployment], error) { + return s.List(ctx, filter, offset, limit) + } + return pagination.NewIterator(fetchFunc, 100) +} + +// Count returns the number of deployments matching the filter. +func (s *DeploymentsService) Count(ctx context.Context, filter *models.DeploymentFilter) (int, error) { + if filter == nil { + filter = &models.DeploymentFilter{} + } + + var count int + if err := s.client.post(ctx, "/deployments/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count deployments: %w", err) + } + return count, nil +} + +// GetSchedules retrieves all schedules for a deployment. +func (s *DeploymentsService) GetSchedules(ctx context.Context, id uuid.UUID) ([]models.DeploymentSchedule, error) { + var schedules []models.DeploymentSchedule + path := joinPath("/deployments", id.String(), "schedules") + if err := s.client.get(ctx, path, &schedules); err != nil { + return nil, fmt.Errorf("failed to get deployment schedules: %w", err) + } + return schedules, nil +} + +// CreateSchedules creates schedules for a deployment. +func (s *DeploymentsService) CreateSchedules(ctx context.Context, id uuid.UUID, schedules []models.DeploymentScheduleCreate) ([]models.DeploymentSchedule, error) { + var result []models.DeploymentSchedule + path := joinPath("/deployments", id.String(), "schedules") + if err := s.client.post(ctx, path, schedules, &result); err != nil { + return nil, fmt.Errorf("failed to create deployment schedules: %w", err) + } + return result, nil +} + +// UpdateSchedule updates a specific schedule for a deployment. +func (s *DeploymentsService) UpdateSchedule(ctx context.Context, deploymentID, scheduleID uuid.UUID, req *models.DeploymentScheduleUpdate) error { + path := joinPath("/deployments", deploymentID.String(), "schedules", scheduleID.String()) + if err := s.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to update deployment schedule: %w", err) + } + return nil +} + +// DeleteSchedule deletes a specific schedule from a deployment. +func (s *DeploymentsService) DeleteSchedule(ctx context.Context, deploymentID, scheduleID uuid.UUID) error { + path := joinPath("/deployments", deploymentID.String(), "schedules", scheduleID.String()) + if err := s.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete deployment schedule: %w", err) + } + return nil +} diff --git a/pkg/client/logs.go b/pkg/client/logs.go new file mode 100644 index 0000000..74da163 --- /dev/null +++ b/pkg/client/logs.go @@ -0,0 +1,55 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" +) + +// LogsService handles operations related to logs. +type LogsService struct { + client *Client +} + +// Create creates new log entries. +func (l *LogsService) Create(ctx context.Context, logs []*models.LogCreate) error { + if err := l.client.post(ctx, "/logs/", logs, nil); err != nil { + return fmt.Errorf("failed to create logs: %w", err) + } + return nil +} + +// List retrieves logs with filtering. +func (l *LogsService) List(ctx context.Context, filter interface{}, offset, limit int) (*pagination.PaginatedResponse[models.Log], error) { + type request struct { + Filter interface{} `json:"filter,omitempty"` + Offset int `json:"offset"` + Limit int `json:"limit"` + } + + req := request{ + Filter: filter, + Offset: offset, + Limit: limit, + } + + type response struct { + Results []models.Log `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := l.client.post(ctx, "/logs/filter", req, &resp); err != nil { + return nil, fmt.Errorf("failed to list logs: %w", err) + } + + return &pagination.PaginatedResponse[models.Log]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} diff --git a/pkg/client/services.go b/pkg/client/services.go deleted file mode 100644 index 6111348..0000000 --- a/pkg/client/services.go +++ /dev/null @@ -1,600 +0,0 @@ -package client - -import ( - "context" - "fmt" - - "git.schultes.dev/schultesdev/prefect-go/pkg/models" - "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" - "github.com/google/uuid" -) - -// DeploymentsService handles operations related to deployments. -type DeploymentsService struct { - client *Client -} - -// Create creates a new deployment. -func (s *DeploymentsService) Create(ctx context.Context, req *models.DeploymentCreate) (*models.Deployment, error) { - var deployment models.Deployment - if err := s.client.post(ctx, "/deployments/", req, &deployment); err != nil { - return nil, fmt.Errorf("failed to create deployment: %w", err) - } - return &deployment, nil -} - -// Get retrieves a deployment by ID. -func (s *DeploymentsService) Get(ctx context.Context, id uuid.UUID) (*models.Deployment, error) { - var deployment models.Deployment - path := joinPath("/deployments", id.String()) - if err := s.client.get(ctx, path, &deployment); err != nil { - return nil, fmt.Errorf("failed to get deployment: %w", err) - } - return &deployment, nil -} - -// GetByName retrieves a deployment by flow name and deployment name. -func (s *DeploymentsService) GetByName(ctx context.Context, flowName, deploymentName string) (*models.Deployment, error) { - var deployment models.Deployment - path := fmt.Sprintf("/deployments/name/%s/%s", flowName, deploymentName) - if err := s.client.get(ctx, path, &deployment); err != nil { - return nil, fmt.Errorf("failed to get deployment by name: %w", err) - } - return &deployment, nil -} - -// Update updates a deployment. -func (s *DeploymentsService) Update(ctx context.Context, id uuid.UUID, req *models.DeploymentUpdate) (*models.Deployment, error) { - var deployment models.Deployment - path := joinPath("/deployments", id.String()) - if err := s.client.patch(ctx, path, req, &deployment); err != nil { - return nil, fmt.Errorf("failed to update deployment: %w", err) - } - return &deployment, nil -} - -// Delete deletes a deployment by ID. -func (s *DeploymentsService) Delete(ctx context.Context, id uuid.UUID) error { - path := joinPath("/deployments", id.String()) - if err := s.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete deployment: %w", err) - } - return nil -} - -// Pause pauses a deployment's schedule. -func (s *DeploymentsService) Pause(ctx context.Context, id uuid.UUID) error { - path := joinPath("/deployments", id.String(), "pause") - if err := s.client.post(ctx, path, nil, nil); err != nil { - return fmt.Errorf("failed to pause deployment: %w", err) - } - return nil -} - -// Resume resumes a deployment's schedule. -func (s *DeploymentsService) Resume(ctx context.Context, id uuid.UUID) error { - path := joinPath("/deployments", id.String(), "resume") - if err := s.client.post(ctx, path, nil, nil); err != nil { - return fmt.Errorf("failed to resume deployment: %w", err) - } - return nil -} - -// CreateFlowRun creates a flow run from a deployment. -func (s *DeploymentsService) CreateFlowRun(ctx context.Context, id uuid.UUID, params map[string]interface{}) (*models.FlowRun, error) { - var flowRun models.FlowRun - path := joinPath("/deployments", id.String(), "create_flow_run") - - req := struct { - Parameters map[string]interface{} `json:"parameters,omitempty"` - }{ - Parameters: params, - } - - if err := s.client.post(ctx, path, req, &flowRun); err != nil { - return nil, fmt.Errorf("failed to create flow run from deployment: %w", err) - } - return &flowRun, nil -} - -// List retrieves a list of deployments with optional filtering. -func (s *DeploymentsService) List(ctx context.Context, filter *models.DeploymentFilter, offset, limit int) (*pagination.PaginatedResponse[models.Deployment], error) { - if filter == nil { - filter = &models.DeploymentFilter{} - } - filter.Offset = offset - filter.Limit = limit - - type response struct { - Results []models.Deployment `json:"results"` - Count int `json:"count"` - } - - var resp response - if err := s.client.post(ctx, "/deployments/filter", filter, &resp); err != nil { - return nil, fmt.Errorf("failed to list deployments: %w", err) - } - - return &pagination.PaginatedResponse[models.Deployment]{ - Results: resp.Results, - Count: resp.Count, - Limit: limit, - Offset: offset, - HasMore: offset+len(resp.Results) < resp.Count, - }, nil -} - -// ListAll returns an iterator for all deployments matching the filter. -func (s *DeploymentsService) ListAll(ctx context.Context, filter *models.DeploymentFilter) *pagination.Iterator[models.Deployment] { - fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Deployment], error) { - return s.List(ctx, filter, offset, limit) - } - return pagination.NewIterator(fetchFunc, 100) -} - -// Count returns the number of deployments matching the filter. -func (s *DeploymentsService) Count(ctx context.Context, filter *models.DeploymentFilter) (int, error) { - if filter == nil { - filter = &models.DeploymentFilter{} - } - - var count int - if err := s.client.post(ctx, "/deployments/count", filter, &count); err != nil { - return 0, fmt.Errorf("failed to count deployments: %w", err) - } - return count, nil -} - -// GetSchedules retrieves all schedules for a deployment. -func (s *DeploymentsService) GetSchedules(ctx context.Context, id uuid.UUID) ([]models.DeploymentSchedule, error) { - var schedules []models.DeploymentSchedule - path := joinPath("/deployments", id.String(), "schedules") - if err := s.client.get(ctx, path, &schedules); err != nil { - return nil, fmt.Errorf("failed to get deployment schedules: %w", err) - } - return schedules, nil -} - -// CreateSchedules creates schedules for a deployment. -func (s *DeploymentsService) CreateSchedules(ctx context.Context, id uuid.UUID, schedules []models.DeploymentScheduleCreate) ([]models.DeploymentSchedule, error) { - var result []models.DeploymentSchedule - path := joinPath("/deployments", id.String(), "schedules") - if err := s.client.post(ctx, path, schedules, &result); err != nil { - return nil, fmt.Errorf("failed to create deployment schedules: %w", err) - } - return result, nil -} - -// UpdateSchedule updates a specific schedule for a deployment. -func (s *DeploymentsService) UpdateSchedule(ctx context.Context, deploymentID, scheduleID uuid.UUID, req *models.DeploymentScheduleUpdate) error { - path := joinPath("/deployments", deploymentID.String(), "schedules", scheduleID.String()) - if err := s.client.patch(ctx, path, req, nil); err != nil { - return fmt.Errorf("failed to update deployment schedule: %w", err) - } - return nil -} - -// DeleteSchedule deletes a specific schedule from a deployment. -func (s *DeploymentsService) DeleteSchedule(ctx context.Context, deploymentID, scheduleID uuid.UUID) error { - path := joinPath("/deployments", deploymentID.String(), "schedules", scheduleID.String()) - if err := s.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete deployment schedule: %w", err) - } - return nil -} - -// TaskRunsService handles operations related to task runs. -type TaskRunsService struct { - client *Client -} - -// Create creates a new task run. -func (t *TaskRunsService) Create(ctx context.Context, req *models.TaskRunCreate) (*models.TaskRun, error) { - var taskRun models.TaskRun - if err := t.client.post(ctx, "/task_runs/", req, &taskRun); err != nil { - return nil, fmt.Errorf("failed to create task run: %w", err) - } - return &taskRun, nil -} - -// Get retrieves a task run by ID. -func (t *TaskRunsService) Get(ctx context.Context, id uuid.UUID) (*models.TaskRun, error) { - var taskRun models.TaskRun - path := joinPath("/task_runs", id.String()) - if err := t.client.get(ctx, path, &taskRun); err != nil { - return nil, fmt.Errorf("failed to get task run: %w", err) - } - return &taskRun, nil -} - -// Delete deletes a task run by ID. -func (t *TaskRunsService) Delete(ctx context.Context, id uuid.UUID) error { - path := joinPath("/task_runs", id.String()) - if err := t.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete task run: %w", err) - } - return nil -} - -// SetState sets the state of a task run. -func (t *TaskRunsService) SetState(ctx context.Context, id uuid.UUID, state *models.StateCreate) (*models.TaskRun, error) { - var taskRun models.TaskRun - path := joinPath("/task_runs", id.String(), "set_state") - - req := struct { - State *models.StateCreate `json:"state"` - }{ - State: state, - } - - if err := t.client.post(ctx, path, req, &taskRun); err != nil { - return nil, fmt.Errorf("failed to set task run state: %w", err) - } - return &taskRun, nil -} - -// Update updates a task run. -func (t *TaskRunsService) Update(ctx context.Context, id uuid.UUID, req *models.TaskRunUpdate) (*models.TaskRun, error) { - var taskRun models.TaskRun - path := joinPath("/task_runs", id.String()) - if err := t.client.patch(ctx, path, req, &taskRun); err != nil { - return nil, fmt.Errorf("failed to update task run: %w", err) - } - return &taskRun, nil -} - -// List retrieves a list of task runs with optional filtering. -func (t *TaskRunsService) List(ctx context.Context, filter *models.TaskRunFilter, offset, limit int) (*pagination.PaginatedResponse[models.TaskRun], error) { - if filter == nil { - filter = &models.TaskRunFilter{} - } - filter.Offset = offset - filter.Limit = limit - - type response struct { - Results []models.TaskRun `json:"results"` - Count int `json:"count"` - } - - var resp response - if err := t.client.post(ctx, "/task_runs/filter", filter, &resp); err != nil { - return nil, fmt.Errorf("failed to list task runs: %w", err) - } - - return &pagination.PaginatedResponse[models.TaskRun]{ - Results: resp.Results, - Count: resp.Count, - Limit: limit, - Offset: offset, - HasMore: offset+len(resp.Results) < resp.Count, - }, nil -} - -// ListAll returns an iterator for all task runs matching the filter. -func (t *TaskRunsService) ListAll(ctx context.Context, filter *models.TaskRunFilter) *pagination.Iterator[models.TaskRun] { - fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.TaskRun], error) { - return t.List(ctx, filter, offset, limit) - } - return pagination.NewIterator(fetchFunc, 100) -} - -// Count returns the number of task runs matching the filter. -func (t *TaskRunsService) Count(ctx context.Context, filter *models.TaskRunFilter) (int, error) { - if filter == nil { - filter = &models.TaskRunFilter{} - } - - var count int - if err := t.client.post(ctx, "/task_runs/count", filter, &count); err != nil { - return 0, fmt.Errorf("failed to count task runs: %w", err) - } - return count, nil -} - -// WorkPoolsService handles operations related to work pools. -type WorkPoolsService struct { - client *Client -} - -// Get retrieves a work pool by name. -func (w *WorkPoolsService) Get(ctx context.Context, name string) (*models.WorkPool, error) { - var workPool models.WorkPool - path := joinPath("/work_pools", name) - if err := w.client.get(ctx, path, &workPool); err != nil { - return nil, fmt.Errorf("failed to get work pool: %w", err) - } - return &workPool, nil -} - -// Create creates a new work pool. -func (w *WorkPoolsService) Create(ctx context.Context, req *models.WorkPoolCreate) (*models.WorkPool, error) { - var workPool models.WorkPool - if err := w.client.post(ctx, "/work_pools/", req, &workPool); err != nil { - return nil, fmt.Errorf("failed to create work pool: %w", err) - } - return &workPool, nil -} - -// Update updates a work pool. -func (w *WorkPoolsService) Update(ctx context.Context, name string, req *models.WorkPoolUpdate) error { - path := joinPath("/work_pools", name) - if err := w.client.patch(ctx, path, req, nil); err != nil { - return fmt.Errorf("failed to update work pool: %w", err) - } - return nil -} - -// Delete deletes a work pool by name. -func (w *WorkPoolsService) Delete(ctx context.Context, name string) error { - path := joinPath("/work_pools", name) - if err := w.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete work pool: %w", err) - } - return nil -} - -// List retrieves work pools with optional filtering. -func (w *WorkPoolsService) List(ctx context.Context, filter *models.WorkPoolFilter, offset, limit int) ([]models.WorkPool, error) { - if filter == nil { - filter = &models.WorkPoolFilter{} - } - filter.Offset = offset - filter.Limit = limit - - var workPools []models.WorkPool - if err := w.client.post(ctx, "/work_pools/filter", filter, &workPools); err != nil { - return nil, fmt.Errorf("failed to list work pools: %w", err) - } - return workPools, nil -} - -// Count returns the number of work pools matching the filter. -func (w *WorkPoolsService) Count(ctx context.Context, filter *models.WorkPoolFilter) (int, error) { - if filter == nil { - filter = &models.WorkPoolFilter{} - } - - var count int - if err := w.client.post(ctx, "/work_pools/count", filter, &count); err != nil { - return 0, fmt.Errorf("failed to count work pools: %w", err) - } - return count, nil -} - -// WorkQueuesService handles operations related to work queues. -type WorkQueuesService struct { - client *Client -} - -// Get retrieves a work queue by ID. -func (w *WorkQueuesService) Get(ctx context.Context, id uuid.UUID) (*models.WorkQueue, error) { - var workQueue models.WorkQueue - path := joinPath("/work_queues", id.String()) - if err := w.client.get(ctx, path, &workQueue); err != nil { - return nil, fmt.Errorf("failed to get work queue: %w", err) - } - return &workQueue, nil -} - -// Create creates a new work queue. -func (w *WorkQueuesService) Create(ctx context.Context, req *models.WorkQueueCreate) (*models.WorkQueue, error) { - var workQueue models.WorkQueue - if err := w.client.post(ctx, "/work_queues/", req, &workQueue); err != nil { - return nil, fmt.Errorf("failed to create work queue: %w", err) - } - return &workQueue, nil -} - -// GetByName retrieves a work queue by name. -func (w *WorkQueuesService) GetByName(ctx context.Context, name string) (*models.WorkQueue, error) { - var workQueue models.WorkQueue - path := joinPath("/work_queues/name", name) - if err := w.client.get(ctx, path, &workQueue); err != nil { - return nil, fmt.Errorf("failed to get work queue by name: %w", err) - } - return &workQueue, nil -} - -// Update updates a work queue. -func (w *WorkQueuesService) Update(ctx context.Context, id uuid.UUID, req *models.WorkQueueUpdate) error { - path := joinPath("/work_queues", id.String()) - if err := w.client.patch(ctx, path, req, nil); err != nil { - return fmt.Errorf("failed to update work queue: %w", err) - } - return nil -} - -// Delete deletes a work queue by ID. -func (w *WorkQueuesService) Delete(ctx context.Context, id uuid.UUID) error { - path := joinPath("/work_queues", id.String()) - if err := w.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete work queue: %w", err) - } - return nil -} - -// List retrieves work queues with optional filtering. -func (w *WorkQueuesService) List(ctx context.Context, filter *models.WorkQueueFilter, offset, limit int) ([]models.WorkQueue, error) { - if filter == nil { - filter = &models.WorkQueueFilter{} - } - filter.Offset = offset - filter.Limit = limit - - var workQueues []models.WorkQueue - if err := w.client.post(ctx, "/work_queues/filter", filter, &workQueues); err != nil { - return nil, fmt.Errorf("failed to list work queues: %w", err) - } - return workQueues, nil -} - -// VariablesService handles operations related to variables. -type VariablesService struct { - client *Client -} - -// Create creates a new variable. -func (v *VariablesService) Create(ctx context.Context, req *models.VariableCreate) (*models.Variable, error) { - var variable models.Variable - if err := v.client.post(ctx, "/variables/", req, &variable); err != nil { - return nil, fmt.Errorf("failed to create variable: %w", err) - } - return &variable, nil -} - -// Get retrieves a variable by ID. -func (v *VariablesService) Get(ctx context.Context, id uuid.UUID) (*models.Variable, error) { - var variable models.Variable - path := joinPath("/variables", id.String()) - if err := v.client.get(ctx, path, &variable); err != nil { - return nil, fmt.Errorf("failed to get variable: %w", err) - } - return &variable, nil -} - -// GetByName retrieves a variable by name. -func (v *VariablesService) GetByName(ctx context.Context, name string) (*models.Variable, error) { - var variable models.Variable - path := joinPath("/variables/name", name) - if err := v.client.get(ctx, path, &variable); err != nil { - return nil, fmt.Errorf("failed to get variable by name: %w", err) - } - return &variable, nil -} - -// Update updates a variable. -func (v *VariablesService) Update(ctx context.Context, id uuid.UUID, req *models.VariableUpdate) (*models.Variable, error) { - var variable models.Variable - path := joinPath("/variables", id.String()) - if err := v.client.patch(ctx, path, req, &variable); err != nil { - return nil, fmt.Errorf("failed to update variable: %w", err) - } - return &variable, nil -} - -// Delete deletes a variable by ID. -func (v *VariablesService) Delete(ctx context.Context, id uuid.UUID) error { - path := joinPath("/variables", id.String()) - if err := v.client.delete(ctx, path); err != nil { - return fmt.Errorf("failed to delete variable: %w", err) - } - return nil -} - -// List retrieves a list of variables with optional filtering. -func (v *VariablesService) List(ctx context.Context, filter *models.VariableFilter, offset, limit int) (*pagination.PaginatedResponse[models.Variable], error) { - if filter == nil { - filter = &models.VariableFilter{} - } - filter.Offset = offset - filter.Limit = limit - - type response struct { - Results []models.Variable `json:"results"` - Count int `json:"count"` - } - - var resp response - if err := v.client.post(ctx, "/variables/filter", filter, &resp); err != nil { - return nil, fmt.Errorf("failed to list variables: %w", err) - } - - return &pagination.PaginatedResponse[models.Variable]{ - Results: resp.Results, - Count: resp.Count, - Limit: limit, - Offset: offset, - HasMore: offset+len(resp.Results) < resp.Count, - }, nil -} - -// ListAll returns an iterator for all variables matching the filter. -func (v *VariablesService) ListAll(ctx context.Context, filter *models.VariableFilter) *pagination.Iterator[models.Variable] { - fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Variable], error) { - return v.List(ctx, filter, offset, limit) - } - return pagination.NewIterator(fetchFunc, 100) -} - -// Count returns the number of variables matching the filter. -func (v *VariablesService) Count(ctx context.Context, filter *models.VariableFilter) (int, error) { - if filter == nil { - filter = &models.VariableFilter{} - } - - var count int - if err := v.client.post(ctx, "/variables/count", filter, &count); err != nil { - return 0, fmt.Errorf("failed to count variables: %w", err) - } - return count, nil -} - -// LogsService handles operations related to logs. -type LogsService struct { - client *Client -} - -// Create creates new log entries. -func (l *LogsService) Create(ctx context.Context, logs []*models.LogCreate) error { - if err := l.client.post(ctx, "/logs/", logs, nil); err != nil { - return fmt.Errorf("failed to create logs: %w", err) - } - return nil -} - -// List retrieves logs with filtering. -func (l *LogsService) List(ctx context.Context, filter interface{}, offset, limit int) (*pagination.PaginatedResponse[models.Log], error) { - type request struct { - Filter interface{} `json:"filter,omitempty"` - Offset int `json:"offset"` - Limit int `json:"limit"` - } - - req := request{ - Filter: filter, - Offset: offset, - Limit: limit, - } - - type response struct { - Results []models.Log `json:"results"` - Count int `json:"count"` - } - - var resp response - if err := l.client.post(ctx, "/logs/filter", req, &resp); err != nil { - return nil, fmt.Errorf("failed to list logs: %w", err) - } - - return &pagination.PaginatedResponse[models.Log]{ - Results: resp.Results, - Count: resp.Count, - Limit: limit, - Offset: offset, - HasMore: offset+len(resp.Results) < resp.Count, - }, nil -} - -// AdminService handles administrative operations. -type AdminService struct { - client *Client -} - -// Health checks the health of the Prefect server. -func (a *AdminService) Health(ctx context.Context) error { - if err := a.client.get(ctx, "/health", nil); err != nil { - return fmt.Errorf("health check failed: %w", err) - } - return nil -} - -// Version retrieves the server version. -func (a *AdminService) Version(ctx context.Context) (string, error) { - var result struct { - Version string `json:"version"` - } - if err := a.client.get(ctx, "/version", &result); err != nil { - return "", fmt.Errorf("failed to get version: %w", err) - } - return result.Version, nil -} diff --git a/pkg/client/task_runs.go b/pkg/client/task_runs.go new file mode 100644 index 0000000..e9ac6aa --- /dev/null +++ b/pkg/client/task_runs.go @@ -0,0 +1,118 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" + "github.com/google/uuid" +) + +// TaskRunsService handles operations related to task runs. +type TaskRunsService struct { + client *Client +} + +// Create creates a new task run. +func (t *TaskRunsService) Create(ctx context.Context, req *models.TaskRunCreate) (*models.TaskRun, error) { + var taskRun models.TaskRun + if err := t.client.post(ctx, "/task_runs/", req, &taskRun); err != nil { + return nil, fmt.Errorf("failed to create task run: %w", err) + } + return &taskRun, nil +} + +// Get retrieves a task run by ID. +func (t *TaskRunsService) Get(ctx context.Context, id uuid.UUID) (*models.TaskRun, error) { + var taskRun models.TaskRun + path := joinPath("/task_runs", id.String()) + if err := t.client.get(ctx, path, &taskRun); err != nil { + return nil, fmt.Errorf("failed to get task run: %w", err) + } + return &taskRun, nil +} + +// Delete deletes a task run by ID. +func (t *TaskRunsService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/task_runs", id.String()) + if err := t.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete task run: %w", err) + } + return nil +} + +// SetState sets the state of a task run. +func (t *TaskRunsService) SetState(ctx context.Context, id uuid.UUID, state *models.StateCreate) (*models.TaskRun, error) { + var taskRun models.TaskRun + path := joinPath("/task_runs", id.String(), "set_state") + + req := struct { + State *models.StateCreate `json:"state"` + }{ + State: state, + } + + if err := t.client.post(ctx, path, req, &taskRun); err != nil { + return nil, fmt.Errorf("failed to set task run state: %w", err) + } + return &taskRun, nil +} + +// Update updates a task run. +func (t *TaskRunsService) Update(ctx context.Context, id uuid.UUID, req *models.TaskRunUpdate) (*models.TaskRun, error) { + var taskRun models.TaskRun + path := joinPath("/task_runs", id.String()) + if err := t.client.patch(ctx, path, req, &taskRun); err != nil { + return nil, fmt.Errorf("failed to update task run: %w", err) + } + return &taskRun, nil +} + +// List retrieves a list of task runs with optional filtering. +func (t *TaskRunsService) List(ctx context.Context, filter *models.TaskRunFilter, offset, limit int) (*pagination.PaginatedResponse[models.TaskRun], error) { + if filter == nil { + filter = &models.TaskRunFilter{} + } + filter.Offset = offset + filter.Limit = limit + + type response struct { + Results []models.TaskRun `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := t.client.post(ctx, "/task_runs/filter", filter, &resp); err != nil { + return nil, fmt.Errorf("failed to list task runs: %w", err) + } + + return &pagination.PaginatedResponse[models.TaskRun]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} + +// ListAll returns an iterator for all task runs matching the filter. +func (t *TaskRunsService) ListAll(ctx context.Context, filter *models.TaskRunFilter) *pagination.Iterator[models.TaskRun] { + fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.TaskRun], error) { + return t.List(ctx, filter, offset, limit) + } + return pagination.NewIterator(fetchFunc, 100) +} + +// Count returns the number of task runs matching the filter. +func (t *TaskRunsService) Count(ctx context.Context, filter *models.TaskRunFilter) (int, error) { + if filter == nil { + filter = &models.TaskRunFilter{} + } + + var count int + if err := t.client.post(ctx, "/task_runs/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count task runs: %w", err) + } + return count, nil +} diff --git a/pkg/client/variables.go b/pkg/client/variables.go new file mode 100644 index 0000000..44702e3 --- /dev/null +++ b/pkg/client/variables.go @@ -0,0 +1,111 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "git.schultes.dev/schultesdev/prefect-go/pkg/pagination" + "github.com/google/uuid" +) + +// VariablesService handles operations related to variables. +type VariablesService struct { + client *Client +} + +// Create creates a new variable. +func (v *VariablesService) Create(ctx context.Context, req *models.VariableCreate) (*models.Variable, error) { + var variable models.Variable + if err := v.client.post(ctx, "/variables/", req, &variable); err != nil { + return nil, fmt.Errorf("failed to create variable: %w", err) + } + return &variable, nil +} + +// Get retrieves a variable by ID. +func (v *VariablesService) Get(ctx context.Context, id uuid.UUID) (*models.Variable, error) { + var variable models.Variable + path := joinPath("/variables", id.String()) + if err := v.client.get(ctx, path, &variable); err != nil { + return nil, fmt.Errorf("failed to get variable: %w", err) + } + return &variable, nil +} + +// GetByName retrieves a variable by name. +func (v *VariablesService) GetByName(ctx context.Context, name string) (*models.Variable, error) { + var variable models.Variable + path := joinPath("/variables/name", name) + if err := v.client.get(ctx, path, &variable); err != nil { + return nil, fmt.Errorf("failed to get variable by name: %w", err) + } + return &variable, nil +} + +// Update updates a variable. +func (v *VariablesService) Update(ctx context.Context, id uuid.UUID, req *models.VariableUpdate) (*models.Variable, error) { + var variable models.Variable + path := joinPath("/variables", id.String()) + if err := v.client.patch(ctx, path, req, &variable); err != nil { + return nil, fmt.Errorf("failed to update variable: %w", err) + } + return &variable, nil +} + +// Delete deletes a variable by ID. +func (v *VariablesService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/variables", id.String()) + if err := v.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete variable: %w", err) + } + return nil +} + +// List retrieves a list of variables with optional filtering. +func (v *VariablesService) List(ctx context.Context, filter *models.VariableFilter, offset, limit int) (*pagination.PaginatedResponse[models.Variable], error) { + if filter == nil { + filter = &models.VariableFilter{} + } + filter.Offset = offset + filter.Limit = limit + + type response struct { + Results []models.Variable `json:"results"` + Count int `json:"count"` + } + + var resp response + if err := v.client.post(ctx, "/variables/filter", filter, &resp); err != nil { + return nil, fmt.Errorf("failed to list variables: %w", err) + } + + return &pagination.PaginatedResponse[models.Variable]{ + Results: resp.Results, + Count: resp.Count, + Limit: limit, + Offset: offset, + HasMore: offset+len(resp.Results) < resp.Count, + }, nil +} + +// ListAll returns an iterator for all variables matching the filter. +func (v *VariablesService) ListAll(ctx context.Context, filter *models.VariableFilter) *pagination.Iterator[models.Variable] { + fetchFunc := func(ctx context.Context, offset, limit int) (*pagination.PaginatedResponse[models.Variable], error) { + return v.List(ctx, filter, offset, limit) + } + return pagination.NewIterator(fetchFunc, 100) +} + +// Count returns the number of variables matching the filter. +func (v *VariablesService) Count(ctx context.Context, filter *models.VariableFilter) (int, error) { + if filter == nil { + filter = &models.VariableFilter{} + } + + var count int + if err := v.client.post(ctx, "/variables/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count variables: %w", err) + } + return count, nil +} diff --git a/pkg/client/work_pools.go b/pkg/client/work_pools.go new file mode 100644 index 0000000..563f77e --- /dev/null +++ b/pkg/client/work_pools.go @@ -0,0 +1,78 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" +) + +// WorkPoolsService handles operations related to work pools. +type WorkPoolsService struct { + client *Client +} + +// Get retrieves a work pool by name. +func (w *WorkPoolsService) Get(ctx context.Context, name string) (*models.WorkPool, error) { + var workPool models.WorkPool + path := joinPath("/work_pools", name) + if err := w.client.get(ctx, path, &workPool); err != nil { + return nil, fmt.Errorf("failed to get work pool: %w", err) + } + return &workPool, nil +} + +// Create creates a new work pool. +func (w *WorkPoolsService) Create(ctx context.Context, req *models.WorkPoolCreate) (*models.WorkPool, error) { + var workPool models.WorkPool + if err := w.client.post(ctx, "/work_pools/", req, &workPool); err != nil { + return nil, fmt.Errorf("failed to create work pool: %w", err) + } + return &workPool, nil +} + +// Update updates a work pool. +func (w *WorkPoolsService) Update(ctx context.Context, name string, req *models.WorkPoolUpdate) error { + path := joinPath("/work_pools", name) + if err := w.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to update work pool: %w", err) + } + return nil +} + +// Delete deletes a work pool by name. +func (w *WorkPoolsService) Delete(ctx context.Context, name string) error { + path := joinPath("/work_pools", name) + if err := w.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete work pool: %w", err) + } + return nil +} + +// List retrieves work pools with optional filtering. +func (w *WorkPoolsService) List(ctx context.Context, filter *models.WorkPoolFilter, offset, limit int) ([]models.WorkPool, error) { + if filter == nil { + filter = &models.WorkPoolFilter{} + } + filter.Offset = offset + filter.Limit = limit + + var workPools []models.WorkPool + if err := w.client.post(ctx, "/work_pools/filter", filter, &workPools); err != nil { + return nil, fmt.Errorf("failed to list work pools: %w", err) + } + return workPools, nil +} + +// Count returns the number of work pools matching the filter. +func (w *WorkPoolsService) Count(ctx context.Context, filter *models.WorkPoolFilter) (int, error) { + if filter == nil { + filter = &models.WorkPoolFilter{} + } + + var count int + if err := w.client.post(ctx, "/work_pools/count", filter, &count); err != nil { + return 0, fmt.Errorf("failed to count work pools: %w", err) + } + return count, nil +} diff --git a/pkg/client/work_queues.go b/pkg/client/work_queues.go new file mode 100644 index 0000000..64dcc0d --- /dev/null +++ b/pkg/client/work_queues.go @@ -0,0 +1,76 @@ +package client + +import ( + "context" + "fmt" + + "git.schultes.dev/schultesdev/prefect-go/pkg/models" + "github.com/google/uuid" +) + +// WorkQueuesService handles operations related to work queues. +type WorkQueuesService struct { + client *Client +} + +// Get retrieves a work queue by ID. +func (w *WorkQueuesService) Get(ctx context.Context, id uuid.UUID) (*models.WorkQueue, error) { + var workQueue models.WorkQueue + path := joinPath("/work_queues", id.String()) + if err := w.client.get(ctx, path, &workQueue); err != nil { + return nil, fmt.Errorf("failed to get work queue: %w", err) + } + return &workQueue, nil +} + +// Create creates a new work queue. +func (w *WorkQueuesService) Create(ctx context.Context, req *models.WorkQueueCreate) (*models.WorkQueue, error) { + var workQueue models.WorkQueue + if err := w.client.post(ctx, "/work_queues/", req, &workQueue); err != nil { + return nil, fmt.Errorf("failed to create work queue: %w", err) + } + return &workQueue, nil +} + +// GetByName retrieves a work queue by name. +func (w *WorkQueuesService) GetByName(ctx context.Context, name string) (*models.WorkQueue, error) { + var workQueue models.WorkQueue + path := joinPath("/work_queues/name", name) + if err := w.client.get(ctx, path, &workQueue); err != nil { + return nil, fmt.Errorf("failed to get work queue by name: %w", err) + } + return &workQueue, nil +} + +// Update updates a work queue. +func (w *WorkQueuesService) Update(ctx context.Context, id uuid.UUID, req *models.WorkQueueUpdate) error { + path := joinPath("/work_queues", id.String()) + if err := w.client.patch(ctx, path, req, nil); err != nil { + return fmt.Errorf("failed to update work queue: %w", err) + } + return nil +} + +// Delete deletes a work queue by ID. +func (w *WorkQueuesService) Delete(ctx context.Context, id uuid.UUID) error { + path := joinPath("/work_queues", id.String()) + if err := w.client.delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete work queue: %w", err) + } + return nil +} + +// List retrieves work queues with optional filtering. +func (w *WorkQueuesService) List(ctx context.Context, filter *models.WorkQueueFilter, offset, limit int) ([]models.WorkQueue, error) { + if filter == nil { + filter = &models.WorkQueueFilter{} + } + filter.Offset = offset + filter.Limit = limit + + var workQueues []models.WorkQueue + if err := w.client.post(ctx, "/work_queues/filter", filter, &workQueues); err != nil { + return nil, fmt.Errorf("failed to list work queues: %w", err) + } + return workQueues, nil +}