package utils
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
type FileImportError struct {
FileName string `json:"file_name"`
Error string `json:"error"`
Stage string `json:"stage"` }
type ClusterImportInput struct {
FolderPath string DatasetID string LocationID string ClusterID string Recursive bool }
type ClusterImportOutput struct {
TotalFiles int
ImportedFiles int
SkippedFiles int FailedFiles int
AudioMothFiles int
TotalDuration float64
ProcessingTime string
Errors []FileImportError
}
type locationData struct {
Latitude float64
Longitude float64
TimezoneID string
}
type fileData struct {
FileName string
Hash string
Duration float64
SampleRate int
TimestampLocal time.Time
IsAudioMoth bool
MothData *AudioMothData
AstroData AstronomicalData
}
func ImportCluster(
database *sql.DB,
input ClusterImportInput,
) (*ClusterImportOutput, error) {
startTime := time.Now()
info, err := os.Stat(input.FolderPath)
if err != nil {
return nil, fmt.Errorf("folder not accessible: %w", err)
}
if !info.IsDir() {
return nil, fmt.Errorf("path is not a directory: %s", input.FolderPath)
}
locationData, err := getLocationData(database, input.LocationID)
if err != nil {
return nil, fmt.Errorf("failed to get location data: %w", err)
}
wavFiles, err := scanClusterFiles(input.FolderPath, input.Recursive)
if err != nil {
return nil, fmt.Errorf("failed to scan folder: %w", err)
}
if len(wavFiles) == 0 {
return &ClusterImportOutput{
TotalFiles: 0,
ProcessingTime: time.Since(startTime).String(),
Errors: []FileImportError{},
}, nil
}
filesData, processErrors := batchProcessFiles(wavFiles, locationData)
imported, skipped, insertErrors, err := insertClusterFiles(
database,
filesData,
input.DatasetID,
input.ClusterID,
input.LocationID,
)
if err != nil {
return nil, fmt.Errorf("database insertion failed: %w", err)
}
allErrors := append(processErrors, insertErrors...)
audiomothCount := 0
totalDuration := 0.0
for _, fd := range filesData {
if fd.IsAudioMoth {
audiomothCount++
}
totalDuration += fd.Duration
}
return &ClusterImportOutput{
TotalFiles: len(wavFiles),
ImportedFiles: imported,
SkippedFiles: skipped,
FailedFiles: len(allErrors),
AudioMothFiles: audiomothCount,
TotalDuration: totalDuration,
ProcessingTime: time.Since(startTime).String(),
Errors: allErrors,
}, nil
}
func getLocationData(database *sql.DB, locationID string) (*locationData, error) {
var loc locationData
err := database.QueryRow(
"SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",
locationID,
).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)
if err != nil {
return nil, fmt.Errorf("failed to query location data: %w", err)
}
return &loc, nil
}
func EnsureClusterPath(database *sql.DB, clusterID, folderPath string) error {
var currentPath sql.NullString
err := database.QueryRow("SELECT path FROM cluster WHERE id = ?", clusterID).Scan(¤tPath)
if err != nil {
return fmt.Errorf("failed to query cluster: %w", err)
}
if currentPath.Valid && currentPath.String != "" {
return nil
}
normalizedPath := NormalizeFolderPath(folderPath)
_, err = database.Exec(
"UPDATE cluster SET path = ?, last_modified = now() WHERE id = ?",
normalizedPath,
clusterID,
)
if err != nil {
return fmt.Errorf("failed to update cluster path: %w", err)
}
return nil
}
func scanClusterFiles(rootPath string, recursive bool) ([]string, error) {
var wavFiles []string
if recursive {
err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() && strings.HasPrefix(info.Name(), "Clips_") {
return filepath.SkipDir
}
if !info.IsDir() {
ext := strings.ToLower(filepath.Ext(path))
if ext == ".wav" && info.Size() > 0 {
wavFiles = append(wavFiles, path)
}
}
return nil
})
if err != nil {
return nil, err
}
} else {
entries, err := os.ReadDir(rootPath)
if err != nil {
return nil, err
}
for _, entry := range entries {
if !entry.IsDir() {
name := entry.Name()
ext := strings.ToLower(filepath.Ext(name))
if ext == ".wav" {
path := filepath.Join(rootPath, name)
if info, err := os.Stat(path); err == nil && info.Size() > 0 {
wavFiles = append(wavFiles, path)
}
}
}
}
}
sort.Strings(wavFiles)
return wavFiles, nil
}
func batchProcessFiles(wavFiles []string, location *locationData) ([]*fileData, []FileImportError) {
var filesData []*fileData
var errors []FileImportError
type wavInfo struct {
path string
metadata *WAVMetadata
err error
}
wavInfos := make([]wavInfo, len(wavFiles))
for i, path := range wavFiles {
metadata, err := ParseWAVHeader(path)
wavInfos[i] = wavInfo{path: path, metadata: metadata, err: err}
}
var filenamesForParsing []string
var filenameIndices []int
for i, info := range wavInfos {
if info.err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: info.err.Error(),
Stage: "parse",
})
continue
}
if HasTimestampFilename(info.path) {
filenamesForParsing = append(filenamesForParsing, filepath.Base(info.path))
filenameIndices = append(filenameIndices, i)
}
}
filenameTimestampMap := make(map[int]time.Time)
if len(filenamesForParsing) > 0 {
filenameTimestamps, err := ParseFilenameTimestamps(filenamesForParsing)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("filename timestamp parsing failed: %v", err),
Stage: "parse",
})
}
} else {
adjustedTimestamps, err := ApplyTimezoneOffset(filenameTimestamps, location.TimezoneID)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("timezone offset failed: %v", err),
Stage: "parse",
})
}
} else {
for j, idx := range filenameIndices {
filenameTimestampMap[idx] = adjustedTimestamps[j]
}
}
}
}
for i, info := range wavInfos {
if info.err != nil {
continue }
hash, err := ComputeXXH64(info.path)
if err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: fmt.Sprintf("hash calculation failed: %v", err),
Stage: "hash",
})
continue
}
var timestampLocal time.Time
var isAudioMoth bool
var mothData *AudioMothData
if IsAudioMoth(info.metadata.Comment, info.metadata.Artist) {
isAudioMoth = true
mothData, err = ParseAudioMothComment(info.metadata.Comment)
if err == nil {
timestampLocal = mothData.Timestamp
} else {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: fmt.Sprintf("AudioMoth comment parsing failed: %v", err),
Stage: "parse",
})
}
}
if timestampLocal.IsZero() {
if ts, ok := filenameTimestampMap[i]; ok {
timestampLocal = ts
}
}
if timestampLocal.IsZero() {
if !info.metadata.FileModTime.IsZero() {
timestampLocal = info.metadata.FileModTime
}
}
if timestampLocal.IsZero() {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: "no timestamp available (not AudioMoth, filename not parseable, and file mod time missing)",
Stage: "parse",
})
continue
}
astroData := CalculateAstronomicalData(
timestampLocal.UTC(),
info.metadata.Duration,
location.Latitude,
location.Longitude,
)
filesData = append(filesData, &fileData{
FileName: filepath.Base(info.path),
Hash: hash,
Duration: info.metadata.Duration,
SampleRate: info.metadata.SampleRate,
TimestampLocal: timestampLocal,
IsAudioMoth: isAudioMoth,
MothData: mothData,
AstroData: astroData,
})
}
return filesData, errors
}
func insertClusterFiles(
database *sql.DB,
filesData []*fileData,
datasetID, clusterID, locationID string,
) (imported, skipped int, errors []FileImportError, err error) {
ctx := context.Background()
tx, err := database.BeginTx(ctx, nil)
if err != nil {
return 0, 0, nil, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
fileStmt, err := tx.PrepareContext(ctx, `
INSERT INTO file (
id, file_name, xxh64_hash, location_id, timestamp_local,
cluster_id, duration, sample_rate, maybe_solar_night, maybe_civil_night,
moon_phase, created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now(), true)
`)
if err != nil {
return 0, 0, nil, fmt.Errorf("failed to prepare file statement: %w", err)
}
defer fileStmt.Close()
datasetStmt, err := tx.PrepareContext(ctx, `
INSERT INTO file_dataset (file_id, dataset_id, created_at, last_modified)
VALUES (?, ?, now(), now())
`)
if err != nil {
return 0, 0, nil, fmt.Errorf("failed to prepare dataset statement: %w", err)
}
defer datasetStmt.Close()
mothStmt, err := tx.PrepareContext(ctx, `
INSERT INTO moth_metadata (
file_id, timestamp, recorder_id, gain, battery_v, temp_c,
created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, now(), now(), true)
`)
if err != nil {
return 0, 0, nil, fmt.Errorf("failed to prepare moth statement: %w", err)
}
defer mothStmt.Close()
for _, fd := range filesData {
var exists bool
err = tx.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM file WHERE xxh64_hash = ?)",
fd.Hash,
).Scan(&exists)
if err != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: fmt.Sprintf("duplicate check failed: %v", err),
Stage: "insert",
})
continue
}
if exists {
skipped++
continue
}
fileID, err := GenerateLongID()
if err != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: fmt.Sprintf("ID generation failed: %v", err),
Stage: "insert",
})
continue
}
_, err = fileStmt.ExecContext(ctx,
fileID, fd.FileName, fd.Hash, locationID,
fd.TimestampLocal, clusterID, fd.Duration, fd.SampleRate,
fd.AstroData.SolarNight, fd.AstroData.CivilNight, fd.AstroData.MoonPhase,
)
if err != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: fmt.Sprintf("file insert failed: %v", err),
Stage: "insert",
})
continue
}
_, err = datasetStmt.ExecContext(ctx, fileID, datasetID)
if err != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: fmt.Sprintf("file_dataset insert failed: %v", err),
Stage: "insert",
})
continue
}
if fd.IsAudioMoth && fd.MothData != nil {
_, err = mothStmt.ExecContext(ctx,
fileID,
fd.MothData.Timestamp,
&fd.MothData.RecorderID,
&fd.MothData.Gain,
&fd.MothData.BatteryV,
&fd.MothData.TempC,
)
if err != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: fmt.Sprintf("moth_metadata insert failed: %v", err),
Stage: "insert",
})
continue
}
}
imported++
}
err = tx.Commit()
if err != nil {
return 0, 0, errors, fmt.Errorf("transaction commit failed: %w", err)
}
return imported, skipped, errors, nil
}