4VO5HC4R37PDGNAN6SX24X2M5G2V4RXIX2WCTFXDOH2TPRZA7DZQC RFLNV436GWTYJVP3DSVBOOUHZQRUFBBKNDQDYZM5EVXJN66A64MAC IFVRAERTCCDICNTYTG3TX2WASB6RXQQEJWWXQMQZJSQDQ3HLE5OQC 25GQ5TYGSGL7QED7L2IAPFLZ4WJJ2ZFAM6O6X5AOSTYPVJNCOGPQC VZGXBNYYO3E7EPFQ4GOLNVMRXXTQDDQZUU2BZ6JHNBDY4B2QLDAAC C3YEXRHPVZVGUJDZEUPDYWC5JZYBCSSC2ZHORSYSER5TICPX76WAC L4STQEXDGCPZXDHTEUBCOQKBMTFDRVXRLNFQHPDHOVXDCJO33LQQC OCRETPZZPDCUSOPYRH5MVRATJ37TRFGVSIMOI4IV755HFXXOVHEAC package utilsimport ("context""database/sql""fmt""os""path/filepath""sort""strings""time")// FileImportError records errors encountered during file processingtype FileImportError struct {FileName string `json:"file_name"`Error string `json:"error"`Stage string `json:"stage"` // "scan", "hash", "parse", "validate", "insert"}// ClusterImportInput defines parameters for importing one clustertype ClusterImportInput struct {FolderPath string // Absolute path to folder with WAV filesDatasetID string // 12-char dataset IDLocationID string // 12-char location IDClusterID string // 12-char cluster IDRecursive bool // Scan subfolders?}// ClusterImportOutput provides results and statisticstype ClusterImportOutput struct {TotalFiles intImportedFiles intSkippedFiles int // DuplicatesFailedFiles intAudioMothFiles intTotalDuration float64ProcessingTime stringErrors []FileImportError}// locationData holds location information needed for processingtype locationData struct {Latitude float64Longitude float64TimezoneID string}// fileData holds all data for a single file to be importedtype fileData struct {FileName stringHash stringDuration float64SampleRate intTimestampLocal time.TimeIsAudioMoth boolMothData *AudioMothDataAstroData AstronomicalData}// ImportCluster imports all WAV files from a folder into a cluster//// This is the canonical cluster import logic used by both:// - import_files.go (single cluster)// - bulk_file_import.go (multiple clusters)//// Steps:// 1. Validate folder exists// 2. Get location metadata (lat/lon/timezone) from database// 3. Scan folder for WAV files (recursive or not)// 4. Batch process all files:// - Parse WAV headers// - Batch parse filename timestamps (variance-based)// - Resolve timestamps (AudioMoth → filename)// - Calculate hashes// - Calculate astronomical data// 5. Batch insert in single transaction:// - Check duplicates// - INSERT INTO file// - INSERT INTO file_dataset (ALWAYS)// - INSERT INTO moth_metadata (if AudioMoth)// - All-or-nothing commit// 6. Return summary statistics//// Transaction behavior: ALL files succeed or ALL rollback// This preserves cluster integrity (cluster = complete recording session)func ImportCluster(database *sql.DB,input ClusterImportInput,) (*ClusterImportOutput, error) {startTime := time.Now()// Validate folder existsinfo, err := os.Stat(input.FolderPath)if err != nil {return nil, fmt.Errorf("folder not accessible: %w", err)}if !info.IsDir() {return nil, fmt.Errorf("path is not a directory: %s", input.FolderPath)}// Get location data for astronomical calculationslocationData, err := getLocationData(database, input.LocationID)if err != nil {return nil, fmt.Errorf("failed to get location data: %w", err)}// Scan folder for WAV fileswavFiles, err := scanClusterFiles(input.FolderPath, input.Recursive)if err != nil {return nil, fmt.Errorf("failed to scan folder: %w", err)}// If no files, return earlyif len(wavFiles) == 0 {return &ClusterImportOutput{TotalFiles: 0,ProcessingTime: time.Since(startTime).String(),Errors: []FileImportError{},}, nil}// Batch process all filesfilesData, processErrors := batchProcessFiles(wavFiles, locationData)// Batch insert into databaseimported, skipped, insertErrors, err := insertClusterFiles(database,filesData,input.DatasetID,input.ClusterID,input.LocationID,)if err != nil {return nil, fmt.Errorf("database insertion failed: %w", err)}// Combine all errorsallErrors := append(processErrors, insertErrors...)// Calculate summary statisticsaudiomothCount := 0totalDuration := 0.0for _, fd := range filesData {if fd.IsAudioMoth {audiomothCount++}totalDuration += fd.Duration}return &ClusterImportOutput{TotalFiles: len(wavFiles),ImportedFiles: imported,SkippedFiles: skipped,FailedFiles: len(allErrors),AudioMothFiles: audiomothCount,TotalDuration: totalDuration,ProcessingTime: time.Since(startTime).String(),Errors: allErrors,}, nil}// getLocationData retrieves location coordinates and timezonefunc getLocationData(database *sql.DB, locationID string) (*locationData, error) {var loc locationDataerr := database.QueryRow("SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",locationID,).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)if err != nil {return nil, fmt.Errorf("failed to query location data: %w", err)}return &loc, nil}// EnsureClusterPath sets the cluster's path field if it's currently emptyfunc EnsureClusterPath(database *sql.DB, clusterID, folderPath string) error {// Check if cluster already has a pathvar currentPath sql.NullStringerr := database.QueryRow("SELECT path FROM cluster WHERE id = ?", clusterID).Scan(¤tPath)if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}// If path is already set, skipif currentPath.Valid && currentPath.String != "" {return nil}// Normalize folder pathnormalizedPath := NormalizeFolderPath(folderPath)// Update cluster with normalized path_, err = database.Exec("UPDATE cluster SET path = ?, last_modified = now() WHERE id = ?",normalizedPath,clusterID,)if err != nil {return fmt.Errorf("failed to update cluster path: %w", err)}return nil}// scanClusterFiles recursively scans a folder for WAV files, excluding Clips_* subfoldersfunc scanClusterFiles(rootPath string, recursive bool) ([]string, error) {var wavFiles []stringif recursive {err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {if err != nil {return err}// Skip "Clips_*" directoriesif info.IsDir() && strings.HasPrefix(info.Name(), "Clips_") {return filepath.SkipDir}// Check for WAV filesif !info.IsDir() {ext := strings.ToLower(filepath.Ext(path))if ext == ".wav" && info.Size() > 0 {wavFiles = append(wavFiles, path)}}return nil})if err != nil {return nil, err}} else {// Non-recursive: scan only top levelentries, err := os.ReadDir(rootPath)if err != nil {return nil, err}for _, entry := range entries {if !entry.IsDir() {name := entry.Name()ext := strings.ToLower(filepath.Ext(name))if ext == ".wav" {path := filepath.Join(rootPath, name)if info, err := os.Stat(path); err == nil && info.Size() > 0 {wavFiles = append(wavFiles, path)}}}}}// Sort for consistent processing ordersort.Strings(wavFiles)return wavFiles, nil}// batchProcessFiles extracts metadata and calculates hashes for all filesfunc batchProcessFiles(wavFiles []string, location *locationData) ([]*fileData, []FileImportError) {var filesData []*fileDatavar errors []FileImportError// Step 1: Extract WAV metadata from all filestype wavInfo struct {path stringmetadata *WAVMetadataerr error}wavInfos := make([]wavInfo, len(wavFiles))for i, path := range wavFiles {metadata, err := ParseWAVHeader(path)wavInfos[i] = wavInfo{path: path, metadata: metadata, err: err}}// Step 2: Collect filenames for batch timestamp parsingvar filenamesForParsing []stringvar filenameIndices []intfor i, info := range wavInfos {if info.err != nil {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: info.err.Error(),Stage: "parse",})continue}// Check if file has timestamp filename formatif HasTimestampFilename(info.path) {filenamesForParsing = append(filenamesForParsing, filepath.Base(info.path))filenameIndices = append(filenameIndices, i)}}// Step 3: Parse filename timestamps in batch (if any)filenameTimestampMap := make(map[int]time.Time) // Maps file index to timestampif len(filenamesForParsing) > 0 {filenameTimestamps, err := ParseFilenameTimestamps(filenamesForParsing)if err != nil {// If batch parsing fails, record error for all filesfor _, idx := range filenameIndices {errors = append(errors, FileImportError{FileName: filepath.Base(wavInfos[idx].path),Error: fmt.Sprintf("filename timestamp parsing failed: %v", err),Stage: "parse",})}} else {// Apply timezone offsetadjustedTimestamps, err := ApplyTimezoneOffset(filenameTimestamps, location.TimezoneID)if err != nil {for _, idx := range filenameIndices {errors = append(errors, FileImportError{FileName: filepath.Base(wavInfos[idx].path),Error: fmt.Sprintf("timezone offset failed: %v", err),Stage: "parse",})}} else {// Build map from file index to timestampfor j, idx := range filenameIndices {filenameTimestampMap[idx] = adjustedTimestamps[j]}}}}// Step 4: Process each filefor i, info := range wavInfos {if info.err != nil {continue // Already recorded error}// Calculate hashhash, err := ComputeXXH64(info.path)if err != nil {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: fmt.Sprintf("hash calculation failed: %v", err),Stage: "hash",})continue}// Determine timestampvar timestampLocal time.Timevar isAudioMoth boolvar mothData *AudioMothData// Try AudioMoth comment firstif IsAudioMoth(info.metadata.Comment, info.metadata.Artist) {isAudioMoth = truemothData, err = ParseAudioMothComment(info.metadata.Comment)if err == nil {timestampLocal = mothData.Timestamp} else {// AudioMoth detected but parsing failed - try filenameerrors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: fmt.Sprintf("AudioMoth comment parsing failed: %v", err),Stage: "parse",})}}// If no AudioMoth timestamp, use filename timestampif timestampLocal.IsZero() {if ts, ok := filenameTimestampMap[i]; ok {timestampLocal = ts}}// If still no timestamp, skip fileif timestampLocal.IsZero() {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: "no timestamp available (not AudioMoth and filename not parseable)",Stage: "parse",})continue}// Calculate astronomical dataastroData := CalculateAstronomicalData(timestampLocal.UTC(),info.metadata.Duration,location.Latitude,location.Longitude,)// Add to resultsfilesData = append(filesData, &fileData{FileName: filepath.Base(info.path),Hash: hash,Duration: info.metadata.Duration,SampleRate: info.metadata.SampleRate,TimestampLocal: timestampLocal,IsAudioMoth: isAudioMoth,MothData: mothData,AstroData: astroData,})}return filesData, errors}// insertClusterFiles inserts all file data into database in a single transactionfunc insertClusterFiles(database *sql.DB,filesData []*fileData,datasetID, clusterID, locationID string,) (imported, skipped int, errors []FileImportError, err error) {// Begin transactionctx := context.Background()tx, err := database.BeginTx(ctx, nil)if err != nil {return 0, 0, nil, fmt.Errorf("failed to begin transaction: %w", err)}defer tx.Rollback() // Rollback if not committed// Prepare statementsfileStmt, err := tx.PrepareContext(ctx, `INSERT INTO file (id, file_name, xxh64_hash, location_id, timestamp_local,cluster_id, duration, sample_rate, maybe_solar_night, maybe_civil_night,moon_phase, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now(), true)`)if err != nil {return 0, 0, nil, fmt.Errorf("failed to prepare file statement: %w", err)}defer fileStmt.Close()datasetStmt, err := tx.PrepareContext(ctx, `INSERT INTO file_dataset (file_id, dataset_id, created_at, last_modified)VALUES (?, ?, now(), now())`)if err != nil {return 0, 0, nil, fmt.Errorf("failed to prepare dataset statement: %w", err)}defer datasetStmt.Close()mothStmt, err := tx.PrepareContext(ctx, `INSERT INTO moth_metadata (file_id, timestamp, recorder_id, gain, battery_v, temp_c,created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, now(), now(), true)`)if err != nil {return 0, 0, nil, fmt.Errorf("failed to prepare moth statement: %w", err)}defer mothStmt.Close()// Insert each filefor _, fd := range filesData {// Check for duplicate hashvar exists boolerr = tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM file WHERE xxh64_hash = ?)",fd.Hash,).Scan(&exists)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("duplicate check failed: %v", err),Stage: "insert",})continue}if exists {skipped++continue}// Generate file IDfileID, err := GenerateLongID()if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("ID generation failed: %v", err),Stage: "insert",})continue}// Insert file record_, err = fileStmt.ExecContext(ctx,fileID, fd.FileName, fd.Hash, locationID,fd.TimestampLocal, clusterID, fd.Duration, fd.SampleRate,fd.AstroData.SolarNight, fd.AstroData.CivilNight, fd.AstroData.MoonPhase,)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("file insert failed: %v", err),Stage: "insert",})continue}// Insert file_dataset junction (ALWAYS)_, err = datasetStmt.ExecContext(ctx, fileID, datasetID)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("file_dataset insert failed: %v", err),Stage: "insert",})continue}// If AudioMoth, insert moth_metadataif fd.IsAudioMoth && fd.MothData != nil {_, err = mothStmt.ExecContext(ctx,fileID,fd.MothData.Timestamp,&fd.MothData.RecorderID,&fd.MothData.Gain,&fd.MothData.BatteryV,&fd.MothData.TempC,)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("moth_metadata insert failed: %v", err),Stage: "insert",})continue}}imported++}// Commit transactionerr = tx.Commit()if err != nil {return 0, 0, errors, fmt.Errorf("transaction commit failed: %w", err)}return imported, skipped, errors, nil}
Summary ImportSummary `json:"summary" jsonschema:"Import summary with counts and statistics"`FileIDs []string `json:"file_ids" jsonschema:"List of successfully imported file IDs"`Errors []FileImportError `json:"errors,omitempty" jsonschema:"Errors encountered during import (if any)"`
Summary ImportSummary `json:"summary" jsonschema:"Import summary with counts and statistics"`FileIDs []string `json:"file_ids" jsonschema:"List of successfully imported file IDs"`Errors []utils.FileImportError `json:"errors,omitempty" jsonschema:"Errors encountered during import (if any)"`
}// FileImportError records errors encountered during file processingtype FileImportError struct {FileName string `json:"file_name"`Error string `json:"error"`Stage string `json:"stage"` // "scan", "hash", "parse", "validate", "insert"}// fileData holds all data for a single file to be importedtype fileData struct {FileName stringHash stringDuration float64SampleRate intTimestampLocal time.TimeIsAudioMoth boolMothData *utils.AudioMothDataAstroData utils.AstronomicalData
return nil, output, fmt.Errorf("failed to scan folder: %w", err)}if len(wavFiles) == 0 {output = ImportAudioFilesOutput{Summary: ImportSummary{TotalFiles: 0,ProcessingTime: time.Since(startTime).String(),},FileIDs: []string{},Errors: []FileImportError{},}return &mcp.CallToolResult{}, output, nil
return nil, output, fmt.Errorf("failed to open database: %w", err)
// Phase 3.5: Set cluster path if emptyerr = ensureClusterPath(dbPath, input.ClusterID, input.FolderPath)
// Import the cluster (ALL THE LOGIC IS HERE)clusterOutput, err := utils.ImportCluster(database, utils.ClusterImportInput{FolderPath: input.FolderPath,DatasetID: input.DatasetID,LocationID: input.LocationID,ClusterID: input.ClusterID,Recursive: recursive,})
// Phase 4: Process all files (extract metadata, calculate hashes, etc.)filesData, errors := processFiles(wavFiles, locationData)// Phase 5: Insert into databaseimportedFiles, skippedFiles, insertErrors := insertFilesIntoDB(dbPath,filesData,input.DatasetID,input.ClusterID,input.LocationID,)// Combine all errorsallErrors := append(errors, insertErrors...)// Calculate summaryaudiomothCount := 0totalDuration := 0.0for _, fd := range filesData {if fd.IsAudioMoth {audiomothCount++}totalDuration += fd.Duration}summary := ImportSummary{TotalFiles: len(wavFiles),ImportedFiles: importedFiles,SkippedFiles: skippedFiles,FailedFiles: len(allErrors),AudioMothFiles: audiomothCount,TotalDuration: totalDuration,ProcessingTime: time.Since(startTime).String(),}// Collect file IDsfileIDs := make([]string, 0, len(filesData))// Note: File IDs are generated during insert, not tracked here// This would require refactoring insertFilesIntoDB to return IDs
// Map to MCP output format
Summary: summary,FileIDs: fileIDs,Errors: allErrors,
Summary: ImportSummary{TotalFiles: clusterOutput.TotalFiles,ImportedFiles: clusterOutput.ImportedFiles,SkippedFiles: clusterOutput.SkippedFiles,FailedFiles: clusterOutput.FailedFiles,AudioMothFiles: clusterOutput.AudioMothFiles,TotalDuration: clusterOutput.TotalDuration,ProcessingTime: time.Since(startTime).String(),},FileIDs: []string{}, // File IDs not tracked currentlyErrors: clusterOutput.Errors,
}// locationData holds location information needed for processingtype locationData struct {Latitude float64Longitude float64TimezoneID string}// getLocationData retrieves location coordinates and timezonefunc getLocationData(dbPath, locationID string) (*locationData, error) {database, err := db.OpenReadOnlyDB(dbPath)if err != nil {return nil, err}defer database.Close()var loc locationDataerr = database.QueryRow("SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",locationID,).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)if err != nil {return nil, fmt.Errorf("failed to query location data: %w", err)}return &loc, nil}// ensureClusterPath sets the cluster's path field if it's currently emptyfunc ensureClusterPath(dbPath, clusterID, folderPath string) error {database, err := db.OpenWriteableDB(dbPath)if err != nil {return fmt.Errorf("failed to open database: %w", err)}defer database.Close()// Check if cluster already has a pathvar currentPath sql.NullStringerr = database.QueryRow("SELECT path FROM cluster WHERE id = ?", clusterID).Scan(¤tPath)if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}// If path is already set, skipif currentPath.Valid && currentPath.String != "" {return nil}// Normalize folder pathnormalizedPath := utils.NormalizeFolderPath(folderPath)// Update cluster with normalized path_, err = database.Exec("UPDATE cluster SET path = ?, last_modified = now() WHERE id = ?",normalizedPath,clusterID,)if err != nil {return fmt.Errorf("failed to update cluster path: %w", err)}return nil}// scanWAVFiles recursively scans a folder for WAV files, excluding Clips_* subfoldersfunc scanWAVFiles(rootPath string, recursive bool) ([]string, error) {var wavFiles []stringif recursive {err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {if err != nil {return err}// Skip "Clips_*" directoriesif info.IsDir() && strings.HasPrefix(info.Name(), "Clips_") {return filepath.SkipDir}// Check for WAV filesif !info.IsDir() {ext := strings.ToLower(filepath.Ext(path))if ext == ".wav" && info.Size() > 0 {wavFiles = append(wavFiles, path)}}return nil})if err != nil {return nil, err}} else {// Non-recursive: scan only top levelentries, err := os.ReadDir(rootPath)if err != nil {return nil, err}for _, entry := range entries {if !entry.IsDir() {name := entry.Name()ext := strings.ToLower(filepath.Ext(name))if ext == ".wav" {path := filepath.Join(rootPath, name)if info, err := os.Stat(path); err == nil && info.Size() > 0 {wavFiles = append(wavFiles, path)}}}}}// Sort for consistent processing ordersort.Strings(wavFiles)return wavFiles, nil}// processFiles extracts metadata and calculates hashes for all filesfunc processFiles(wavFiles []string, location *locationData) ([]*fileData, []FileImportError) {var filesData []*fileDatavar errors []FileImportError// Step 1: Extract WAV metadata from all filestype wavInfo struct {path stringmetadata *utils.WAVMetadataerr error}wavInfos := make([]wavInfo, len(wavFiles))for i, path := range wavFiles {metadata, err := utils.ParseWAVHeader(path)wavInfos[i] = wavInfo{path: path, metadata: metadata, err: err}}// Step 2: Collect filenames for batch timestamp parsingvar filenamesForParsing []stringvar filenameIndices []intfor i, info := range wavInfos {if info.err != nil {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: info.err.Error(),Stage: "parse",})continue}// Check if file has timestamp filename formatif utils.HasTimestampFilename(info.path) {filenamesForParsing = append(filenamesForParsing, filepath.Base(info.path))filenameIndices = append(filenameIndices, i)}}// Step 3: Parse filename timestamps in batch (if any)filenameTimestampMap := make(map[int]time.Time) // Maps file index to timestampif len(filenamesForParsing) > 0 {filenameTimestamps, err := utils.ParseFilenameTimestamps(filenamesForParsing)if err != nil {// If batch parsing fails, record error for all filesfor _, idx := range filenameIndices {errors = append(errors, FileImportError{FileName: filepath.Base(wavInfos[idx].path),Error: fmt.Sprintf("filename timestamp parsing failed: %v", err),Stage: "parse",})}} else {// Apply timezone offsetadjustedTimestamps, err := utils.ApplyTimezoneOffset(filenameTimestamps, location.TimezoneID)if err != nil {for _, idx := range filenameIndices {errors = append(errors, FileImportError{FileName: filepath.Base(wavInfos[idx].path),Error: fmt.Sprintf("timezone offset failed: %v", err),Stage: "parse",})}} else {// Build map from file index to timestampfor j, idx := range filenameIndices {filenameTimestampMap[idx] = adjustedTimestamps[j]}}}}// Step 4: Process each filefor i, info := range wavInfos {if info.err != nil {continue // Already recorded error}// Calculate hashhash, err := utils.ComputeXXH64(info.path)if err != nil {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: fmt.Sprintf("hash calculation failed: %v", err),Stage: "hash",})continue}// Determine timestampvar timestampLocal time.Timevar isAudioMoth boolvar mothData *utils.AudioMothData// Try AudioMoth comment firstif utils.IsAudioMoth(info.metadata.Comment, info.metadata.Artist) {isAudioMoth = truemothData, err = utils.ParseAudioMothComment(info.metadata.Comment)if err == nil {timestampLocal = mothData.Timestamp} else {// AudioMoth detected but parsing failed - try filenameerrors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: fmt.Sprintf("AudioMoth comment parsing failed: %v", err),Stage: "parse",})}}// If no AudioMoth timestamp, use filename timestampif timestampLocal.IsZero() {if ts, ok := filenameTimestampMap[i]; ok {timestampLocal = ts}}// If still no timestamp, skip fileif timestampLocal.IsZero() {errors = append(errors, FileImportError{FileName: filepath.Base(info.path),Error: "no timestamp available (not AudioMoth and filename not parseable)",Stage: "parse",})continue}// Calculate astronomical dataastroData := utils.CalculateAstronomicalData(timestampLocal.UTC(),info.metadata.Duration,location.Latitude,location.Longitude,)// Add to resultsfilesData = append(filesData, &fileData{FileName: filepath.Base(info.path),Hash: hash,Duration: info.metadata.Duration,SampleRate: info.metadata.SampleRate,TimestampLocal: timestampLocal,IsAudioMoth: isAudioMoth,MothData: mothData,AstroData: astroData,})}return filesData, errors
// insertFilesIntoDB inserts all file data into database in a single transactionfunc insertFilesIntoDB(dbPath string,filesData []*fileData,datasetID, clusterID, locationID string,) (imported, skipped int, errors []FileImportError) {// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("failed to open database: %v", err),Stage: "insert",})return 0, 0, errors}defer database.Close()// Begin transactionctx := context.Background()tx, err := database.BeginTx(ctx, nil)if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("failed to begin transaction: %v", err),Stage: "insert",})return 0, 0, errors}defer tx.Rollback() // Rollback if not committed// Prepare statementsfileStmt, err := tx.PrepareContext(ctx, `INSERT INTO file (id, file_name, xxh64_hash, location_id, timestamp_local,cluster_id, duration, sample_rate, maybe_solar_night, maybe_civil_night,moon_phase, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now(), true)`)if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("failed to prepare file statement: %v", err),Stage: "insert",})return 0, 0, errors}defer fileStmt.Close()datasetStmt, err := tx.PrepareContext(ctx, `INSERT INTO file_dataset (file_id, dataset_id, created_at, last_modified)VALUES (?, ?, now(), now())`)if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("failed to prepare dataset statement: %v", err),Stage: "insert",})return 0, 0, errors}defer datasetStmt.Close()mothStmt, err := tx.PrepareContext(ctx, `INSERT INTO moth_metadata (file_id, timestamp, recorder_id, gain, battery_v, temp_c,created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, now(), now(), true)`)if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("failed to prepare moth statement: %v", err),Stage: "insert",})return 0, 0, errors}defer mothStmt.Close()// Insert each filefor _, fd := range filesData {// Check for duplicate hashvar exists boolerr = tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM file WHERE xxh64_hash = ?)",fd.Hash,).Scan(&exists)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("duplicate check failed: %v", err),Stage: "insert",})continue}if exists {skipped++continue}// Generate file IDfileID, err := utils.GenerateLongID()if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("ID generation failed: %v", err),Stage: "insert",})continue}// Insert file record_, err = fileStmt.ExecContext(ctx,fileID, fd.FileName, fd.Hash, locationID,fd.TimestampLocal, clusterID, fd.Duration, fd.SampleRate,fd.AstroData.SolarNight, fd.AstroData.CivilNight, fd.AstroData.MoonPhase,)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("file insert failed: %v", err),Stage: "insert",})continue}// Insert file_dataset junction_, err = datasetStmt.ExecContext(ctx, fileID, datasetID)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("file_dataset insert failed: %v", err),Stage: "insert",})continue}// If AudioMoth, insert moth_metadataif fd.IsAudioMoth && fd.MothData != nil {_, err = mothStmt.ExecContext(ctx,fileID,fd.MothData.Timestamp,&fd.MothData.RecorderID,&fd.MothData.Gain,&fd.MothData.BatteryV,&fd.MothData.TempC,)if err != nil {errors = append(errors, FileImportError{FileName: fd.FileName,Error: fmt.Sprintf("moth_metadata insert failed: %v", err),Stage: "insert",})continue}}imported++}// Commit transactionerr = tx.Commit()if err != nil {errors = append(errors, FileImportError{FileName: "",Error: fmt.Sprintf("transaction commit failed: %v", err),Stage: "insert",})return 0, 0, errors}return imported, skipped, errors}
}// locationData holds location information (local wrapper around utils types)type locationData struct {Latitude float64Longitude float64TimezoneID string}// fileData holds file metadata (local wrapper around utils types)type fileData struct {FileName stringHash stringDuration float64SampleRate intTimestampLocal time.TimeIsAudioMoth boolMothData *utils.AudioMothDataAstroData utils.AstronomicalData
// getLocationData retrieves location coordinates and timezone from databasefunc getLocationData(dbPath, locationID string) (*locationData, error) {database, err := db.OpenReadOnlyDB(dbPath)if err != nil {return nil, err}defer database.Close()var loc locationDataerr = database.QueryRow("SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",locationID,).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)if err != nil {return nil, fmt.Errorf("failed to query location data: %w", err)}return &loc, nil}// ensureClusterPath sets the cluster's path field if it's currently emptyfunc ensureClusterPath(dbPath, clusterID, folderPath string) error {database, err := db.OpenWriteableDB(dbPath)if err != nil {return fmt.Errorf("failed to open database: %w", err)}defer database.Close()return utils.EnsureClusterPath(database, clusterID, folderPath)}
// Get location datavar latitude, longitude float64var timezoneID stringerr := database.QueryRow(`SELECT latitude, longitude, timezone_idFROM locationWHERE id = ?`, locationID).Scan(&latitude, &longitude, &timezoneID)if err != nil {return nil, fmt.Errorf("failed to get location data: %v", err)
// Check if directory existsif _, err := os.Stat(folderPath); os.IsNotExist(err) {logger.Log(" WARNING: Directory not found, skipping")return stats, nil
// Find all WAV files recursivelyvar wavFiles []stringerr = filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".wav") {wavFiles = append(wavFiles, path)}return nil
// Import the cluster (SAME LOGIC AS import_files.go)logger.Log(" Importing cluster...")clusterOutput, err := utils.ImportCluster(database, utils.ClusterImportInput{FolderPath: folderPath,DatasetID: datasetID,LocationID: locationID,ClusterID: clusterID,Recursive: true,
stats.TotalFiles = len(wavFiles)if stats.TotalFiles == 0 {return stats, nil}
// Map to bulk import statsstats.TotalFiles = clusterOutput.TotalFilesstats.ImportedFiles = clusterOutput.ImportedFilesstats.DuplicateFiles = clusterOutput.SkippedFilesstats.ErrorFiles = clusterOutput.FailedFiles
// Process each filefor i, filePath := range wavFiles {// Log progress periodicallyif (i+1)%100 == 0 {logger.Log(" Processing file %d/%d...", i+1, len(wavFiles))
// Log errorsfor i, fileErr := range clusterOutput.Errors {if i < 5 { // Log first 5logger.Log(" ERROR: %s: %s", fileErr.FileName, fileErr.Error)
err := bulkImportSingleFile(database, filePath, datasetID, locationID, clusterID, latitude, longitude, timezoneID)if err != nil {if strings.Contains(err.Error(), "duplicate") {stats.DuplicateFiles++} else {stats.ErrorFiles++// Log first few errorsif stats.ErrorFiles <= 5 {logger.Log(" ERROR: %s: %v", filepath.Base(filePath), err)}}} else {stats.ImportedFiles++}}
logger.Log(" Complete: %d imported, %d duplicates, %d errors", stats.ImportedFiles, stats.DuplicateFiles, stats.ErrorFiles)
}// bulkImportSingleFile imports a single WAV file into the databasefunc bulkImportSingleFile(database *sql.DB, filePath, datasetID, locationID, clusterID string, latitude, longitude float64, timezoneID string) error {// Process file: WAV parse → hash → timestamp → astronomical dataresult, err := utils.ProcessSingleFile(filePath, latitude, longitude, timezoneID, false)if err != nil {return err}// Check for duplicate_, isDup, err := utils.CheckDuplicateHash(database, result.Hash)if err != nil {return err}if isDup {return fmt.Errorf("duplicate")}// Generate file IDfileID, err := utils.GenerateLongID()if err != nil {return fmt.Errorf("ID generation failed: %v", err)}// Insert into databasenow := time.Now().UTC()_, err = database.Exec(`INSERT INTO file (id, location_id, cluster_id,file_name, xxh64_hash, duration, sample_rate,timestamp_local, maybe_solar_night, maybe_civil_night,moon_phase,active, created_at, last_modified) VALUES (?, ?, ?,?, ?, ?, ?,?, ?, ?,?,true, ?, ?)`, fileID, locationID, clusterID,result.FileName, result.Hash, result.Duration, result.SampleRate,result.TimestampLocal, result.AstroData.SolarNight, result.AstroData.CivilNight,result.AstroData.MoonPhase,now, now)if err != nil {return fmt.Errorf("database insert failed: %v", err)}return nil