package utils
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"time"
)
type ClusterImportInput struct {
FolderPath string DatasetID string LocationID string ClusterID string Recursive bool }
type ClusterImportOutput struct {
TotalFiles int
ImportedFiles int
SkippedFiles int FailedFiles int
AudioMothFiles int
TotalDuration float64
ProcessingTime string
Errors []FileImportError
}
type LocationData struct {
Latitude float64
Longitude float64
TimezoneID string
}
func ImportCluster(
database DB,
tx *sql.Tx,
input ClusterImportInput,
) (*ClusterImportOutput, error) {
startTime := time.Now()
info, err := os.Stat(input.FolderPath)
if err != nil {
return nil, fmt.Errorf("folder not accessible: %w", err)
}
if !info.IsDir() {
return nil, fmt.Errorf("path is not a directory: %s", input.FolderPath)
}
locationData, err := GetLocationData(database, input.LocationID)
if err != nil {
return nil, fmt.Errorf("failed to get location data: %w", err)
}
wavFiles, err := FindFiles(input.FolderPath, FindFilesOptions{
Extension: ".wav",
Recursive: input.Recursive,
SkipPrefixes: []string{"Clips_"},
SkipHidden: true, MinSize: 1, })
if err != nil {
return nil, fmt.Errorf("failed to scan folder: %w", err)
}
if len(wavFiles) == 0 {
return &ClusterImportOutput{
TotalFiles: 0,
ProcessingTime: time.Since(startTime).String(),
Errors: []FileImportError{},
}, nil
}
filesData, processErrors := batchProcessFiles(wavFiles, locationData)
imported, skipped, insertErrors, err := insertClusterFiles(
tx,
filesData,
input.DatasetID,
input.ClusterID,
input.LocationID,
)
if err != nil {
return nil, fmt.Errorf("database insertion failed: %w", err)
}
allErrors := append(processErrors, insertErrors...)
audiomothCount := 0
totalDuration := 0.0
for _, fd := range filesData {
if fd.IsAudioMoth {
audiomothCount++
}
totalDuration += fd.Duration
}
return &ClusterImportOutput{
TotalFiles: len(wavFiles),
ImportedFiles: imported,
SkippedFiles: skipped,
FailedFiles: len(allErrors),
AudioMothFiles: audiomothCount,
TotalDuration: totalDuration,
ProcessingTime: time.Since(startTime).String(),
Errors: allErrors,
}, nil
}
func GetLocationData(database DB, locationID string) (*LocationData, error) {
var loc LocationData
err := database.QueryRow(
"SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",
locationID,
).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)
if err != nil {
return nil, fmt.Errorf("failed to query location data: %w", err)
}
return &loc, nil
}
func EnsureClusterPath(database DB, clusterID, folderPath string) error {
var currentPath sql.NullString
err := database.QueryRow("SELECT path FROM cluster WHERE id = ?", clusterID).Scan(¤tPath)
if err != nil {
return fmt.Errorf("failed to query cluster: %w", err)
}
if currentPath.Valid && currentPath.String != "" {
return nil
}
normalizedPath := NormalizeFolderPath(folderPath)
_, err = database.Exec(
"UPDATE cluster SET path = ?, last_modified = now() WHERE id = ?",
normalizedPath,
clusterID,
)
if err != nil {
return fmt.Errorf("failed to update cluster path: %w", err)
}
return nil
}
type wavInfo struct {
path string
metadata *WAVMetadata
hash string
err error
}
func parseFilenameTimestampsBatch(
wavInfos []wavInfo,
filenameIndices []int,
filenames []string,
timezoneID string,
) (map[int]time.Time, []FileImportError) {
var errors []FileImportError
result := make(map[int]time.Time)
filenameTimestamps, err := ParseFilenameTimestamps(filenames)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("filename timestamp parsing failed: %v", err),
Stage: StageParse,
})
}
return result, errors
}
adjustedTimestamps, err := ApplyTimezoneOffset(filenameTimestamps, timezoneID)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("timezone offset failed: %v", err),
Stage: StageParse,
})
}
return result, errors
}
for j, idx := range filenameIndices {
result[idx] = adjustedTimestamps[j]
}
return result, errors
}
func resolveFileData(info wavInfo, preParsedTime *time.Time, location *LocationData) (*FileProcessingResult, error) {
tsResult, err := ResolveTimestamp(info.metadata, info.path, location.TimezoneID, true, preParsedTime)
if err != nil {
return nil, err
}
astroData := CalculateAstronomicalData(
tsResult.Timestamp.UTC(),
info.metadata.Duration,
location.Latitude,
location.Longitude,
)
return &FileProcessingResult{
FileName: filepath.Base(info.path),
Hash: info.hash,
Duration: info.metadata.Duration,
SampleRate: info.metadata.SampleRate,
TimestampLocal: tsResult.Timestamp,
IsAudioMoth: tsResult.IsAudioMoth,
MothData: tsResult.MothData,
AstroData: astroData,
}, nil
}
func batchProcessFiles(wavFiles []string, location *LocationData) ([]*FileProcessingResult, []FileImportError) {
var filesData []*FileProcessingResult
var errors []FileImportError
wavInfos := make([]wavInfo, len(wavFiles))
for i, path := range wavFiles {
metadata, hash, err := ParseWAVHeaderWithHash(path)
wavInfos[i] = wavInfo{path: path, metadata: metadata, hash: hash, err: err}
}
var filenamesForParsing []string
var filenameIndices []int
for i, info := range wavInfos {
if info.err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: info.err.Error(),
Stage: StageParse,
})
continue
}
if HasTimestampFilename(info.path) {
filenamesForParsing = append(filenamesForParsing, filepath.Base(info.path))
filenameIndices = append(filenameIndices, i)
}
}
filenameTimestampMap := make(map[int]time.Time)
if len(filenamesForParsing) > 0 {
tsMap, tsErrors := parseFilenameTimestampsBatch(wavInfos, filenameIndices, filenamesForParsing, location.TimezoneID)
errors = append(errors, tsErrors...)
filenameTimestampMap = tsMap
}
for i, info := range wavInfos {
if info.err != nil {
continue
}
var preParsedTime *time.Time
if ts, ok := filenameTimestampMap[i]; ok {
preParsedTime = &ts
}
fd, err := resolveFileData(info, preParsedTime, location)
if err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: err.Error(),
Stage: StageParse,
})
continue
}
filesData = append(filesData, fd)
}
return filesData, errors
}
func insertSingleFile(
ctx context.Context,
tx *sql.Tx,
fd *FileProcessingResult,
fileStmt, datasetStmt, mothStmt *sql.Stmt,
datasetID, clusterID, locationID string,
) (bool, error) {
_, isDuplicate, err := CheckDuplicateHash(tx, fd.Hash)
if err != nil {
return false, fmt.Errorf("duplicate check failed: %w", err)
}
if isDuplicate {
return false, nil }
fileID, err := GenerateLongID()
if err != nil {
return false, fmt.Errorf("ID generation failed: %w", err)
}
_, err = fileStmt.ExecContext(ctx,
fileID, fd.FileName, fd.Hash, locationID,
fd.TimestampLocal, clusterID, fd.Duration, fd.SampleRate,
fd.AstroData.SolarNight, fd.AstroData.CivilNight, fd.AstroData.MoonPhase,
)
if err != nil {
return false, fmt.Errorf("file insert failed: %w", err)
}
_, err = datasetStmt.ExecContext(ctx, fileID, datasetID)
if err != nil {
return false, fmt.Errorf("file_dataset insert failed: %w", err)
}
if fd.IsAudioMoth && fd.MothData != nil {
_, err = mothStmt.ExecContext(ctx,
fileID,
fd.MothData.Timestamp,
&fd.MothData.RecorderID,
&fd.MothData.Gain,
&fd.MothData.BatteryV,
&fd.MothData.TempC,
)
if err != nil {
return false, fmt.Errorf("moth_metadata insert failed: %w", err)
}
}
return true, nil
}
type clusterStmts struct {
fileStmt *sql.Stmt
datasetStmt *sql.Stmt
mothStmt *sql.Stmt
}
func prepareClusterStmts(ctx context.Context, tx *sql.Tx) (*clusterStmts, error) {
fileStmt, err := tx.PrepareContext(ctx, `
INSERT INTO file (
id, file_name, xxh64_hash, location_id, timestamp_local,
cluster_id, duration, sample_rate, maybe_solar_night, maybe_civil_night,
moon_phase, created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now(), true)
`)
if err != nil {
return nil, fmt.Errorf("failed to prepare file statement: %w", err)
}
datasetStmt, err := tx.PrepareContext(ctx, `
INSERT INTO file_dataset (file_id, dataset_id, created_at, last_modified)
VALUES (?, ?, now(), now())
`)
if err != nil {
fileStmt.Close()
return nil, fmt.Errorf("failed to prepare dataset statement: %w", err)
}
mothStmt, err := tx.PrepareContext(ctx, `
INSERT INTO moth_metadata (
file_id, timestamp, recorder_id, gain, battery_v, temp_c,
created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, now(), now(), true)
`)
if err != nil {
fileStmt.Close()
datasetStmt.Close()
return nil, fmt.Errorf("failed to prepare moth statement: %w", err)
}
return &clusterStmts{fileStmt: fileStmt, datasetStmt: datasetStmt, mothStmt: mothStmt}, nil
}
func (s *clusterStmts) Close() {
s.fileStmt.Close()
s.datasetStmt.Close()
s.mothStmt.Close()
}
func insertClusterFiles(
tx *sql.Tx,
filesData []*FileProcessingResult,
datasetID, clusterID, locationID string,
) (imported, skipped int, errors []FileImportError, err error) {
ctx := context.Background()
stmts, err := prepareClusterStmts(ctx, tx)
if err != nil {
return 0, 0, nil, err
}
defer stmts.Close()
for _, fd := range filesData {
wasImported, insertErr := insertSingleFile(ctx, tx, fd, stmts.fileStmt, stmts.datasetStmt, stmts.mothStmt, datasetID, clusterID, locationID)
if insertErr != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: insertErr.Error(),
Stage: StageInsert,
})
continue
}
if wasImported {
imported++
} else {
skipped++
}
}
return imported, skipped, errors, nil
}