[WIP] v0.7.0 fixes and changes to cache and remotefetcher
This commit is contained in:
191
pkg/remotefetcher/cache.go
Normal file
191
pkg/remotefetcher/cache.go
Normal file
@ -0,0 +1,191 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type CacheData struct {
|
||||
Hash string `yaml:"hash"`
|
||||
Path string `yaml:"path"`
|
||||
Type string `yaml:"type"`
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
mu sync.Mutex
|
||||
store map[string]CacheData
|
||||
file string
|
||||
dir string
|
||||
}
|
||||
|
||||
func NewCache(file, dir string) (*Cache, error) {
|
||||
cache := &Cache{
|
||||
store: make(map[string]CacheData),
|
||||
file: file,
|
||||
dir: dir,
|
||||
}
|
||||
err := cache.loadFromFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func (c *Cache) loadFromFile() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if _, err := os.Stat(c.file); os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(c.file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var cacheData []CacheData
|
||||
err = yaml.Unmarshal(data, &cacheData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, item := range cacheData {
|
||||
c.store[item.Hash] = item
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) saveToFile() error {
|
||||
// println("Saving cache to file:", c.file)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
var cacheData []CacheData
|
||||
for _, data := range c.store {
|
||||
cacheData = append(cacheData, data)
|
||||
}
|
||||
|
||||
data, err := yaml.Marshal(cacheData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.WriteFile(c.file, data, 0644)
|
||||
}
|
||||
|
||||
func (c *Cache) Get(hash string) ([]byte, CacheData, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
println("Getting cache data for hash:", hash)
|
||||
cacheData, exists := c.store[hash]
|
||||
if !exists {
|
||||
println("Cache data does not exist for hash:", hash)
|
||||
return nil, CacheData{}, false
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(cacheData.Path)
|
||||
if err != nil {
|
||||
return nil, CacheData{}, false
|
||||
}
|
||||
|
||||
return data, cacheData, true
|
||||
}
|
||||
|
||||
func (c *Cache) AddDataToStore(hash string, cacheData CacheData) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.store[hash] = cacheData
|
||||
}
|
||||
|
||||
func (c *Cache) Set(source, hash string, data []byte, dataType string) (CacheData, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
fileName := filepath.Base(source)
|
||||
|
||||
path := filepath.Join(c.dir, fmt.Sprintf("%s-%s", fileName, hash))
|
||||
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
os.MkdirAll(c.dir, 0700)
|
||||
}
|
||||
|
||||
err := os.WriteFile(path, data, 0644)
|
||||
if err != nil {
|
||||
return CacheData{}, err
|
||||
}
|
||||
|
||||
cacheData := CacheData{
|
||||
Hash: hash,
|
||||
Path: path,
|
||||
Type: dataType,
|
||||
}
|
||||
|
||||
c.store[hash] = cacheData
|
||||
|
||||
// Unlock before calling saveToFile to avoid double-locking
|
||||
c.mu.Unlock()
|
||||
err = c.saveToFile()
|
||||
c.mu.Lock()
|
||||
if err != nil {
|
||||
return CacheData{}, err
|
||||
}
|
||||
|
||||
// fmt.Printf("Cache data: %v", cacheData)
|
||||
return cacheData, nil
|
||||
}
|
||||
|
||||
type CachedFetcher struct {
|
||||
data []byte
|
||||
path string
|
||||
dataType string
|
||||
}
|
||||
|
||||
func (cf *CachedFetcher) Fetch(source string) ([]byte, error) {
|
||||
return cf.data, nil
|
||||
}
|
||||
|
||||
func (cf *CachedFetcher) Parse(data []byte, target interface{}) error {
|
||||
if cf.dataType == "yaml" {
|
||||
return yaml.Unmarshal(data, target)
|
||||
}
|
||||
return errors.New("parse not supported on cached fetcher for scripts")
|
||||
}
|
||||
|
||||
func (cf *CachedFetcher) Hash(data []byte) string {
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// Function to read and parse the hashMetadataSample.yml file
|
||||
func LoadMetadataFromFile(filePath string) ([]*CacheData, error) {
|
||||
// fmt.Println("Loading metadata from file:", filePath)
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
// Create the file if it does not exist
|
||||
emptyData := []byte("[]")
|
||||
err := os.WriteFile(filePath, emptyData, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cacheData []*CacheData
|
||||
err = yaml.Unmarshal(data, &cacheData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cacheData, nil
|
||||
}
|
75
pkg/remotefetcher/configfetcher.go
Normal file
75
pkg/remotefetcher/configfetcher.go
Normal file
@ -0,0 +1,75 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ConfigFetcher interface {
|
||||
// Fetch retrieves the configuration from the specified URL or source
|
||||
// Returns the raw data as bytes or an error
|
||||
Fetch(source string) ([]byte, error)
|
||||
|
||||
// Parse decodes the raw data into a Go structure (e.g., Commands, CommandLists)
|
||||
// Takes the raw data as input and populates the target interface
|
||||
Parse(data []byte, target interface{}) error
|
||||
|
||||
// Hash returns the hash of the configuration data
|
||||
Hash(data []byte) string
|
||||
}
|
||||
|
||||
// ErrFileNotFound is returned when the file is not found and should be ignored
|
||||
var ErrFileNotFound = errors.New("remotefetcher: file not found")
|
||||
|
||||
func NewConfigFetcher(source string, cache *Cache, options ...Option) (ConfigFetcher, error) {
|
||||
var fetcher ConfigFetcher
|
||||
var dataType string
|
||||
|
||||
config := FetcherConfig{}
|
||||
for _, option := range options {
|
||||
option(&config)
|
||||
}
|
||||
if strings.HasPrefix(source, "http") || strings.HasPrefix(source, "https") {
|
||||
fetcher = NewHTTPFetcher(options...)
|
||||
dataType = "yaml"
|
||||
} else if strings.HasPrefix(source, "s3") {
|
||||
var err error
|
||||
fetcher, err = NewS3Fetcher(options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dataType = "yaml"
|
||||
} else {
|
||||
fetcher = &LocalFetcher{}
|
||||
dataType = "yaml"
|
||||
|
||||
return fetcher, nil
|
||||
}
|
||||
|
||||
//TODO: should local files be cached?
|
||||
|
||||
data, err := fetcher.Fetch(source)
|
||||
if err != nil {
|
||||
if config.IgnoreFileNotFound && isFileNotFoundError(err) {
|
||||
return nil, ErrFileNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hash := fetcher.Hash(data)
|
||||
if cachedData, cacheMeta, exists := cache.Get(hash); exists {
|
||||
return &CachedFetcher{data: cachedData, path: cacheMeta.Path, dataType: cacheMeta.Type}, nil
|
||||
}
|
||||
|
||||
cacheData, err := cache.Set(source, hash, data, dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CachedFetcher{data: data, path: cacheData.Path, dataType: cacheData.Type}, nil
|
||||
}
|
||||
|
||||
func isFileNotFoundError(err error) bool {
|
||||
// Implement logic to check if the error is a "file not found" error
|
||||
// This can be based on the error type or message
|
||||
return strings.Contains(err.Error(), "file not found")
|
||||
}
|
60
pkg/remotefetcher/http.go
Normal file
60
pkg/remotefetcher/http.go
Normal file
@ -0,0 +1,60 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type HTTPFetcher struct {
|
||||
HTTPClient *http.Client
|
||||
config FetcherConfig
|
||||
}
|
||||
|
||||
// NewHTTPFetcher creates a new instance of HTTPFetcher with the provided options.
|
||||
func NewHTTPFetcher(options ...Option) *HTTPFetcher {
|
||||
cfg := &FetcherConfig{}
|
||||
for _, opt := range options {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
// Initialize HTTP client if not provided
|
||||
if cfg.HTTPClient == nil {
|
||||
cfg.HTTPClient = http.DefaultClient
|
||||
}
|
||||
|
||||
return &HTTPFetcher{HTTPClient: cfg.HTTPClient, config: *cfg}
|
||||
}
|
||||
|
||||
// Fetch retrieves the file from the specified source URL
|
||||
func (h *HTTPFetcher) Fetch(source string) ([]byte, error) {
|
||||
resp, err := http.Get(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound && h.config.IgnoreFileNotFound {
|
||||
return nil, ErrFileNotFound
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, errors.New("failed to fetch remote config: " + resp.Status)
|
||||
}
|
||||
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
// Parse decodes the raw data into the provided target structure
|
||||
func (h *HTTPFetcher) Parse(data []byte, target interface{}) error {
|
||||
return yaml.Unmarshal(data, target)
|
||||
}
|
||||
|
||||
func (h *HTTPFetcher) Hash(data []byte) string {
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
42
pkg/remotefetcher/local.go
Normal file
42
pkg/remotefetcher/local.go
Normal file
@ -0,0 +1,42 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type LocalFetcher struct {
|
||||
config FetcherConfig
|
||||
}
|
||||
|
||||
// Fetch retrieves the file from the specified local file path
|
||||
func (l *LocalFetcher) Fetch(source string) ([]byte, error) {
|
||||
// Check if the file exists
|
||||
if _, err := os.Stat(source); os.IsNotExist(err) {
|
||||
if l.config.IgnoreFileNotFound {
|
||||
return nil, ErrFileNotFound
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
file, err := os.Open(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
return io.ReadAll(file)
|
||||
}
|
||||
|
||||
// Parse decodes the raw data into the provided target structure
|
||||
func (l *LocalFetcher) Parse(data []byte, target interface{}) error {
|
||||
return yaml.Unmarshal(data, target)
|
||||
}
|
||||
|
||||
func (l *LocalFetcher) Hash(data []byte) string {
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
37
pkg/remotefetcher/options.go
Normal file
37
pkg/remotefetcher/options.go
Normal file
@ -0,0 +1,37 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
// Option is a function that configures a fetcher.
|
||||
type Option func(*FetcherConfig)
|
||||
|
||||
// FetcherConfig holds the configuration for a fetcher.
|
||||
type FetcherConfig struct {
|
||||
S3Client *s3.Client
|
||||
HTTPClient *http.Client
|
||||
IgnoreFileNotFound bool
|
||||
}
|
||||
|
||||
// WithS3Client sets the S3 client for the fetcher.
|
||||
func WithS3Client(client *s3.Client) Option {
|
||||
return func(cfg *FetcherConfig) {
|
||||
cfg.S3Client = client
|
||||
}
|
||||
}
|
||||
|
||||
// WithHTTPClient sets the HTTP client for the fetcher.
|
||||
func WithHTTPClient(client *http.Client) Option {
|
||||
return func(cfg *FetcherConfig) {
|
||||
cfg.HTTPClient = client
|
||||
}
|
||||
}
|
||||
|
||||
func IgnoreFileNotFound() Option {
|
||||
return func(cfg *FetcherConfig) {
|
||||
cfg.IgnoreFileNotFound = true
|
||||
}
|
||||
}
|
91
pkg/remotefetcher/s3.go
Normal file
91
pkg/remotefetcher/s3.go
Normal file
@ -0,0 +1,91 @@
|
||||
package remotefetcher
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type S3Fetcher struct {
|
||||
S3Client *s3.Client
|
||||
config FetcherConfig
|
||||
}
|
||||
|
||||
// NewS3Fetcher creates a new instance of S3Fetcher with the provided options.
|
||||
func NewS3Fetcher(options ...Option) (*S3Fetcher, error) {
|
||||
cfg := &FetcherConfig{}
|
||||
for _, opt := range options {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
// Initialize S3 client if not provided
|
||||
if cfg.S3Client == nil {
|
||||
awsCfg, err := config.LoadDefaultConfig(context.TODO())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.S3Client = s3.NewFromConfig(awsCfg)
|
||||
}
|
||||
|
||||
return &S3Fetcher{S3Client: cfg.S3Client, config: *cfg}, nil
|
||||
}
|
||||
|
||||
// Fetch retrieves the configuration from an S3 bucket
|
||||
// Source should be in the format "bucket-name/object-key"
|
||||
func (s *S3Fetcher) Fetch(source string) ([]byte, error) {
|
||||
bucket, key, err := parseS3Source(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.S3Client.GetObject(context.TODO(), &s3.GetObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &key,
|
||||
})
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
var notFound *types.NoSuchKey
|
||||
if errors.As(err, ¬Found) && s.config.IgnoreFileNotFound {
|
||||
return nil, ErrFileNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = buf.ReadFrom(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// Parse decodes the raw data into the provided target structure
|
||||
func (s *S3Fetcher) Parse(data []byte, target interface{}) error {
|
||||
return yaml.Unmarshal(data, target)
|
||||
}
|
||||
|
||||
// Helper function to parse S3 source into bucket and key
|
||||
func parseS3Source(source string) (bucket, key string, err error) {
|
||||
parts := strings.SplitN(source, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
return "", "", errors.New("invalid S3 source format, expected bucket-name/object-key")
|
||||
}
|
||||
return parts[0], parts[1], nil
|
||||
}
|
||||
|
||||
func (s *S3Fetcher) Hash(data []byte) string {
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
Reference in New Issue
Block a user