142 lines
4.8 KiB
Go
142 lines
4.8 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"git.schultes.dev/schultesdev/prefect-go/pkg/models"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// ConcurrencyLimitsV2Service handles operations related to v2 concurrency limits.
|
|
type ConcurrencyLimitsV2Service struct {
|
|
client *Client
|
|
}
|
|
|
|
// Create creates a new v2 concurrency limit.
|
|
func (s *ConcurrencyLimitsV2Service) Create(ctx context.Context, req *models.ConcurrencyLimitV2Create) (*models.ConcurrencyLimitV2, error) {
|
|
var limit models.ConcurrencyLimitV2
|
|
if err := s.client.do(ctx, http.MethodPost, "/v2/concurrency_limits/", req, &limit); err != nil {
|
|
return nil, fmt.Errorf("failed to create v2 concurrency limit: %w", err)
|
|
}
|
|
return &limit, nil
|
|
}
|
|
|
|
// Get retrieves a v2 concurrency limit by ID or name.
|
|
func (s *ConcurrencyLimitsV2Service) Get(ctx context.Context, idOrName string) (*models.ConcurrencyLimitV2, error) {
|
|
var limit models.ConcurrencyLimitV2
|
|
path := joinPath("/v2/concurrency_limits", idOrName)
|
|
if err := s.client.get(ctx, path, &limit); err != nil {
|
|
return nil, fmt.Errorf("failed to get v2 concurrency limit: %w", err)
|
|
}
|
|
return &limit, nil
|
|
}
|
|
|
|
// Update updates a v2 concurrency limit.
|
|
func (s *ConcurrencyLimitsV2Service) Update(ctx context.Context, idOrName string, req *models.ConcurrencyLimitV2Update) error {
|
|
path := joinPath("/v2/concurrency_limits", idOrName)
|
|
if err := s.client.patch(ctx, path, req, nil); err != nil {
|
|
return fmt.Errorf("failed to update v2 concurrency limit: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete deletes a v2 concurrency limit by ID or name.
|
|
func (s *ConcurrencyLimitsV2Service) Delete(ctx context.Context, idOrName string) error {
|
|
path := joinPath("/v2/concurrency_limits", idOrName)
|
|
if err := s.client.delete(ctx, path); err != nil {
|
|
return fmt.Errorf("failed to delete v2 concurrency limit: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List retrieves all v2 concurrency limits.
|
|
func (s *ConcurrencyLimitsV2Service) List(ctx context.Context) ([]models.GlobalConcurrencyLimitResponse, error) {
|
|
var limits []models.GlobalConcurrencyLimitResponse
|
|
if err := s.client.post(ctx, "/v2/concurrency_limits/filter", nil, &limits); err != nil {
|
|
return nil, fmt.Errorf("failed to list v2 concurrency limits: %w", err)
|
|
}
|
|
return limits, nil
|
|
}
|
|
|
|
// Increment increments active slots for v2 concurrency limits.
|
|
func (s *ConcurrencyLimitsV2Service) Increment(ctx context.Context, names []string, slots int, mode string) error {
|
|
req := struct {
|
|
Names []string `json:"names"`
|
|
Slots int `json:"slots"`
|
|
Mode string `json:"mode,omitempty"`
|
|
}{
|
|
Names: names,
|
|
Slots: slots,
|
|
Mode: mode,
|
|
}
|
|
|
|
if err := s.client.post(ctx, "/v2/concurrency_limits/increment", req, nil); err != nil {
|
|
return fmt.Errorf("failed to increment v2 concurrency limits: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IncrementWithLease increments active slots and returns a lease.
|
|
func (s *ConcurrencyLimitsV2Service) IncrementWithLease(ctx context.Context, names []string, slots int, mode string) (*models.ConcurrencyLimitWithLeaseResponse, error) {
|
|
req := struct {
|
|
Names []string `json:"names"`
|
|
Slots int `json:"slots"`
|
|
Mode string `json:"mode,omitempty"`
|
|
}{
|
|
Names: names,
|
|
Slots: slots,
|
|
Mode: mode,
|
|
}
|
|
|
|
var resp models.ConcurrencyLimitWithLeaseResponse
|
|
if err := s.client.post(ctx, "/v2/concurrency_limits/increment-with-lease", req, &resp); err != nil {
|
|
return nil, fmt.Errorf("failed to increment v2 concurrency limits with lease: %w", err)
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
// Decrement decrements active slots for v2 concurrency limits.
|
|
func (s *ConcurrencyLimitsV2Service) Decrement(ctx context.Context, names []string, slots int) error {
|
|
req := struct {
|
|
Names []string `json:"names"`
|
|
Slots int `json:"slots"`
|
|
}{
|
|
Names: names,
|
|
Slots: slots,
|
|
}
|
|
|
|
if err := s.client.post(ctx, "/v2/concurrency_limits/decrement", req, nil); err != nil {
|
|
return fmt.Errorf("failed to decrement v2 concurrency limits: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DecrementWithLease decrements active slots using a lease ID.
|
|
func (s *ConcurrencyLimitsV2Service) DecrementWithLease(ctx context.Context, names []string, slots int, leaseID uuid.UUID) error {
|
|
req := struct {
|
|
Names []string `json:"names"`
|
|
Slots int `json:"slots"`
|
|
LeaseID uuid.UUID `json:"lease_id"`
|
|
}{
|
|
Names: names,
|
|
Slots: slots,
|
|
LeaseID: leaseID,
|
|
}
|
|
|
|
if err := s.client.post(ctx, "/v2/concurrency_limits/decrement-with-lease", req, nil); err != nil {
|
|
return fmt.Errorf("failed to decrement v2 concurrency limits with lease: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RenewLease renews a concurrency lease.
|
|
func (s *ConcurrencyLimitsV2Service) RenewLease(ctx context.Context, leaseID uuid.UUID) error {
|
|
path := joinPath("/v2/concurrency_limits/leases", leaseID.String(), "renew")
|
|
if err := s.client.post(ctx, path, nil, nil); err != nil {
|
|
return fmt.Errorf("failed to renew concurrency lease: %w", err)
|
|
}
|
|
return nil
|
|
}
|