package tools
import (
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
)
type CallsFromSourceInput struct {
Folder string `json:"folder"`
File string `json:"file"`
Delete bool `json:"delete"`
ProgressHandler ProgressHandler `json:"-"` }
type CallsFromSourceOutput struct {
Calls []ClusteredCall `json:"calls"`
TotalCalls int `json:"total_calls"`
SpeciesCount map[string]int `json:"species_count"`
DataFilesWritten int `json:"data_files_written"`
DataFilesSkipped int `json:"data_files_skipped"`
FilesProcessed int `json:"files_processed"`
FilesDeleted int `json:"files_deleted"`
Filter string `json:"filter"`
Error *string `json:"error,omitempty"`
}
type CallSource interface {
Name() string
FindFiles(folder string) ([]string, error)
ProcessFile(path string, cache *DirCache) (calls []ClusteredCall, written, skipped bool, err error)
}
func callsFromSource(src CallSource, input CallsFromSourceInput) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
var files []string
if input.File != "" {
files = []string{input.File}
} else if input.Folder != "" {
var err error
files, err = src.FindFiles(input.Folder)
if err != nil {
errMsg := fmt.Sprintf("Failed to find %s files: %v", src.Name(), err)
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
} else {
errMsg := "Either --folder or --file must be specified"
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
if len(files) == 0 {
errMsg := fmt.Sprintf("No %s files found", src.Name())
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
if len(files) < 10 {
return callsFromSourceSequential(src, input, files)
}
return callsFromSourceParallel(src, input, files)
}
func callsFromSourceSequential(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
dirCaches := make(map[string]*DirCache)
if input.Folder != "" {
dirCaches[input.Folder] = NewDirCache(input.Folder)
}
speciesCount := make(map[string]int)
var allCalls []ClusteredCall
dataFilesWritten := 0
dataFilesSkipped := 0
filesProcessed := 0
filesDeleted := 0
for _, file := range files {
dir := filepath.Dir(file)
cache := dirCaches[dir]
if cache == nil {
cache = NewDirCache(dir)
dirCaches[dir] = cache
}
calls, written, skipped, err := src.ProcessFile(file, cache)
if err != nil {
errMsg := fmt.Sprintf("Error processing %s: %v", file, err)
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
if written {
dataFilesWritten++
}
if skipped {
dataFilesSkipped++
}
for _, call := range calls {
allCalls = append(allCalls, call)
speciesCount[call.EbirdCode]++
}
filesProcessed++
if input.Delete && written {
if err := os.Remove(file); err != nil {
errMsg := fmt.Sprintf("Failed to delete %s: %v", file, err)
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
filesDeleted++
}
if input.ProgressHandler != nil {
input.ProgressHandler(filesProcessed, len(files), filepath.Base(file))
}
}
sort.Slice(allCalls, func(i, j int) bool {
if allCalls[i].File != allCalls[j].File {
return allCalls[i].File < allCalls[j].File
}
return allCalls[i].StartTime < allCalls[j].StartTime
})
output.Calls = allCalls
output.TotalCalls = len(allCalls)
output.SpeciesCount = speciesCount
output.DataFilesWritten = dataFilesWritten
output.DataFilesSkipped = dataFilesSkipped
output.FilesProcessed = filesProcessed
output.FilesDeleted = filesDeleted
return output, nil
}
type sourceJob struct {
filePath string
}
type sourceResult struct {
path string
calls []ClusteredCall
written bool
skipped bool
err error
}
func (r sourceResult) filePath() string { return r.path }
func (r sourceResult) getCalls() []ClusteredCall { return r.calls }
func (r sourceResult) wasWritten() bool { return r.written }
func (r sourceResult) wasSkipped() bool { return r.skipped }
func (r sourceResult) getError() error { return r.err }
func callsFromSourceParallel(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
total := len(files)
var processed atomic.Int32
dirCaches := &sync.Map{}
if input.Folder != "" {
cache := NewDirCache(input.Folder)
dirCaches.Store(input.Folder, cache)
}
jobs := make(chan sourceJob, total)
results := make(chan parallelResult, total)
var wg sync.WaitGroup
for range DOT_DATA_WORKERS {
wg.Add(1)
go sourceWorker(src, dirCaches, jobs, results, &wg)
}
for _, file := range files {
jobs <- sourceJob{filePath: file}
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
stats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)
if stats.firstErr != nil {
errMsg := stats.firstErr.Error()
output.Error = &errMsg
return output, stats.firstErr
}
sortCallsByFileAndTime(stats.calls)
output.Calls = stats.calls
output.TotalCalls = len(stats.calls)
output.SpeciesCount = stats.speciesCount
output.DataFilesWritten = stats.dataFilesWritten
output.DataFilesSkipped = stats.dataFilesSkipped
output.FilesProcessed = stats.filesProcessed
output.FilesDeleted = stats.filesDeleted
return output, nil
}
func sourceWorker(src CallSource, dirCaches *sync.Map, jobs <-chan sourceJob, results chan<- parallelResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
dir := filepath.Dir(job.filePath)
var cache *DirCache
if cached, ok := dirCaches.Load(dir); ok {
cache = cached.(*DirCache)
} else {
cache = NewDirCache(dir)
dirCaches.Store(dir, cache)
}
calls, written, skipped, err := src.ProcessFile(job.filePath, cache)
results <- sourceResult{
path: job.filePath,
calls: calls,
written: written,
skipped: skipped,
err: err,
}
}
}