Try to understand how cache files are updated

Having to keep a copy of all the files in memory is weird.

We shouldn't have to do that.
This commit is contained in:
Frank Denis 2023-04-06 14:19:25 +02:00
parent b898e07066
commit 92ed5b95e0
3 changed files with 57 additions and 51 deletions

View file

@ -894,7 +894,7 @@ func (config *Config) loadSource(proxy *Proxy, cfgSourceName string, cfgSource *
cfgSource.Prefix, cfgSource.Prefix,
) )
if err != nil { if err != nil {
if len(source.in) <= 0 { if len(source.bin) <= 0 {
dlog.Criticalf("Unable to retrieve source [%s]: [%s]", cfgSourceName, err) dlog.Criticalf("Unable to retrieve source [%s]: [%s]", cfgSourceName, err)
return err return err
} }

View file

@ -3,10 +3,10 @@ package main
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"net/url" "net/url"
"os" "os"
"path/filepath"
"strings" "strings"
"time" "time"
@ -31,13 +31,14 @@ const (
type Source struct { type Source struct {
name string name string
urls []*url.URL urls []*url.URL
format SourceFormat bin []byte // copy of the file content - there's something wrong in our logic, we shouldn't need to keep that in memory
in []byte sig []byte // copy of the signature
minisignKey *minisign.PublicKey minisignKey *minisign.PublicKey
cacheFile string cacheFile string
prefix string
cacheTTL, prefetchDelay time.Duration cacheTTL, prefetchDelay time.Duration
refresh time.Time refresh time.Time
prefix string format SourceFormat
} }
func (source *Source) checkSignature(bin, sig []byte) (err error) { func (source *Source) checkSignature(bin, sig []byte) (err error) {
@ -48,11 +49,10 @@ func (source *Source) checkSignature(bin, sig []byte) (err error) {
return err return err
} }
// timeNow can be replaced by tests to provide a static value // timeNow() can be replaced by tests to provide a static value
var timeNow = time.Now var timeNow = time.Now
func (source *Source) fetchFromCache(now time.Time) (delay time.Duration, err error) { func (source *Source) fetchFromCache(now time.Time) (delay time.Duration, bin []byte, sig []byte, err error) {
var bin, sig []byte
if bin, err = os.ReadFile(source.cacheFile); err != nil { if bin, err = os.ReadFile(source.cacheFile); err != nil {
return return
} }
@ -62,14 +62,13 @@ func (source *Source) fetchFromCache(now time.Time) (delay time.Duration, err er
if err = source.checkSignature(bin, sig); err != nil { if err = source.checkSignature(bin, sig); err != nil {
return return
} }
source.in = bin
var fi os.FileInfo var fi os.FileInfo
if fi, err = os.Stat(source.cacheFile); err != nil { if fi, err = os.Stat(source.cacheFile); err != nil {
return return
} }
if elapsed := now.Sub(fi.ModTime()); elapsed < source.cacheTTL { if elapsed := now.Sub(fi.ModTime()); elapsed < source.cacheTTL {
delay = source.prefetchDelay - elapsed delay = source.prefetchDelay - elapsed
dlog.Debugf("Source [%s] cache file [%s] is still fresh, next update: %v", source.name, source.cacheFile, delay) dlog.Debugf("Source [%s] cache file [%s] is still fresh, next update in %v min", source.name, source.cacheFile, math.Round(delay.Minutes()))
} else { } else {
dlog.Debugf("Source [%s] cache file [%s] needs to be refreshed", source.name, source.cacheFile) dlog.Debugf("Source [%s] cache file [%s] needs to be refreshed", source.name, source.cacheFile)
} }
@ -98,25 +97,25 @@ func writeSource(f string, bin, sig []byte) (err error) {
return fSig.Commit() return fSig.Commit()
} }
func (source *Source) writeToCache(bin, sig []byte, now time.Time) { // Update the cache file with the new data
func (source *Source) updateCache(bin, sig []byte, now time.Time) error {
f := source.cacheFile f := source.cacheFile
var writeErr error // an error writing cache isn't fatal // If the data and signature are unchanged, update the files timestamps only
defer func() { if bin != nil && bytes.Equal(source.bin, bin) && sig != nil && bytes.Equal(source.sig, sig) {
source.in = bin dlog.Debugf("Source [%s] content and signature are unchanged", source.name)
if writeErr == nil { if err := os.Chtimes(f, now, now); err != nil {
return return err
} }
if absPath, absErr := filepath.Abs(f); absErr == nil { if err := os.Chtimes(f+".minisig", now, now); err != nil {
f = absPath return err
}
dlog.Warnf("%s: %s", f, writeErr)
}()
if !bytes.Equal(source.in, bin) {
if writeErr = writeSource(f, bin, sig); writeErr != nil {
return
} }
return nil
} }
writeErr = os.Chtimes(f, now, now) if err := writeSource(f, bin, sig); err != nil {
dlog.Warnf("Source [%s] failed to update cache file [%s]: %v", source.name, f, err)
return err
}
return nil
} }
func (source *Source) parseURLs(urls []string) { func (source *Source) parseURLs(urls []string) {
@ -134,11 +133,10 @@ func fetchFromURL(xTransport *XTransport, u *url.URL) (bin []byte, err error) {
return bin, err return bin, err
} }
func (source *Source) fetchWithCache(xTransport *XTransport, now time.Time) (delay time.Duration, err error) { func (source *Source) fetchWithCache(xTransport *XTransport, now time.Time) (delay time.Duration, bin []byte, sig []byte, err error) {
if delay, err = source.fetchFromCache(now); err != nil { if delay, bin, sig, err = source.fetchFromCache(now); err != nil {
if len(source.urls) == 0 { if len(source.urls) == 0 {
dlog.Errorf("Source [%s] cache file [%s] not present and no valid URL", source.name, source.cacheFile) dlog.Fatalf("Source [%s] cache file [%s] not present and no valid URL", source.name, source.cacheFile)
return
} }
dlog.Debugf("Source [%s] cache file [%s] not present", source.name, source.cacheFile) dlog.Debugf("Source [%s] cache file [%s] not present", source.name, source.cacheFile)
} }
@ -148,10 +146,9 @@ func (source *Source) fetchWithCache(xTransport *XTransport, now time.Time) (del
}() }()
} }
if len(source.urls) == 0 || delay > 0 { if len(source.urls) == 0 || delay > 0 {
return source.sig = sig
return delay, bin, sig, nil // source is still valid
} }
delay = MinimumPrefetchInterval
var bin, sig []byte
for _, srcURL := range source.urls { for _, srcURL := range source.urls {
dlog.Infof("Source [%s] loading from URL [%s]", source.name, srcURL) dlog.Infof("Source [%s] loading from URL [%s]", source.name, srcURL)
sigURL := &url.URL{} sigURL := &url.URL{}
@ -171,11 +168,13 @@ func (source *Source) fetchWithCache(xTransport *XTransport, now time.Time) (del
dlog.Debugf("Source [%s] failed signature check using URL [%s]", source.name, srcURL) dlog.Debugf("Source [%s] failed signature check using URL [%s]", source.name, srcURL)
} }
if err != nil { if err != nil {
return return MinimumPrefetchInterval, nil, nil, err
} }
source.writeToCache(bin, sig, now) if err := source.updateCache(bin, sig, now); err != nil {
delay = source.prefetchDelay return MinimumPrefetchInterval, bin, sig, err // keep using the old data
return }
source.sig = sig
return source.prefetchDelay, bin, sig, nil
} }
// NewSource loads a new source using the given cacheFile and urls, ensuring it has a valid signature // NewSource loads a new source using the given cacheFile and urls, ensuring it has a valid signature
@ -211,10 +210,14 @@ func NewSource(
return source, err return source, err
} }
source.parseURLs(urls) source.parseURLs(urls)
if _, err = source.fetchWithCache(xTransport, timeNow()); err == nil { _, bin, sig, err := source.fetchWithCache(xTransport, timeNow())
dlog.Noticef("Source [%s] loaded", name) if err != nil {
return nil, err
} }
return dlog.Noticef("Source [%s] loaded", name)
source.bin = bin
source.sig = sig
return source, err
} }
// PrefetchSources downloads latest versions of given sources, ensuring they have a valid signature before caching // PrefetchSources downloads latest versions of given sources, ensuring they have a valid signature before caching
@ -226,13 +229,16 @@ func PrefetchSources(xTransport *XTransport, sources []*Source) time.Duration {
continue continue
} }
dlog.Debugf("Prefetching [%s]", source.name) dlog.Debugf("Prefetching [%s]", source.name)
if delay, err := source.fetchWithCache(xTransport, now); err != nil { delay, bin, sig, err := source.fetchWithCache(xTransport, now)
if err != nil {
dlog.Infof("Prefetching [%s] failed: %v, will retry in %v", source.name, err, interval) dlog.Infof("Prefetching [%s] failed: %v, will retry in %v", source.name, err, interval)
} else { continue
dlog.Debugf("Prefetching [%s] succeeded, next update: %v", source.name, delay) }
if delay >= MinimumPrefetchInterval && (interval == MinimumPrefetchInterval || interval > delay) { source.bin = bin
interval = delay source.sig = sig
} dlog.Debugf("Prefetching [%s] succeeded, next update in %v min", source.name, math.Round(delay.Minutes()))
if delay >= MinimumPrefetchInterval && (interval == MinimumPrefetchInterval || interval > delay) {
interval = delay
} }
} }
return interval return interval
@ -254,8 +260,8 @@ func (source *Source) parseV2() ([]RegisteredServer, error) {
stampErrs = append(stampErrs, stampErr) stampErrs = append(stampErrs, stampErr)
dlog.Warn(stampErr) dlog.Warn(stampErr)
} }
in := string(source.in) bin := string(source.bin)
parts := strings.Split(in, "## ") parts := strings.Split(bin, "## ")
if len(parts) < 2 { if len(parts) < 2 {
return registeredServers, fmt.Errorf("Invalid format for source at [%v]", source.urls) return registeredServers, fmt.Errorf("Invalid format for source at [%v]", source.urls)
} }

View file

@ -284,9 +284,9 @@ func prepSourceTestCache(t *testing.T, d *SourceTestData, e *SourceTestExpect, s
e.cache = []SourceFixture{d.fixtures[state][source], d.fixtures[state][source+".minisig"]} e.cache = []SourceFixture{d.fixtures[state][source], d.fixtures[state][source+".minisig"]}
switch state { switch state {
case TestStateCorrect: case TestStateCorrect:
e.Source.in, e.success = e.cache[0].content, true e.Source.bin, e.success = e.cache[0].content, true
case TestStateExpired: case TestStateExpired:
e.Source.in = e.cache[0].content e.Source.bin = e.cache[0].content
case TestStatePartial, TestStatePartialSig: case TestStatePartial, TestStatePartialSig:
e.err = "signature" e.err = "signature"
case TestStateMissing, TestStateMissingSig, TestStateOpenErr, TestStateOpenSigErr: case TestStateMissing, TestStateMissingSig, TestStateOpenErr, TestStateOpenSigErr:
@ -339,7 +339,7 @@ func prepSourceTestDownload(
switch state { switch state {
case TestStateCorrect: case TestStateCorrect:
e.cache = []SourceFixture{d.fixtures[state][source], d.fixtures[state][source+".minisig"]} e.cache = []SourceFixture{d.fixtures[state][source], d.fixtures[state][source+".minisig"]}
e.Source.in, e.success = e.cache[0].content, true e.Source.bin, e.success = e.cache[0].content, true
fallthrough fallthrough
case TestStateMissingSig, TestStatePartial, TestStatePartialSig, TestStateReadSigErr: case TestStateMissingSig, TestStatePartial, TestStatePartialSig, TestStateReadSigErr:
d.reqExpect[path+".minisig"]++ d.reqExpect[path+".minisig"]++
@ -477,7 +477,7 @@ func TestPrefetchSources(t *testing.T) {
e.mtime = d.timeUpd e.mtime = d.timeUpd
s := &Source{} s := &Source{}
*s = *e.Source *s = *e.Source
s.in = nil s.bin = nil
sources = append(sources, s) sources = append(sources, s)
expects = append(expects, e) expects = append(expects, e)
} }