AVQ66WO4R4KVXAVP4YPEF65CPHJJY55H7ZOVPZ2BHFMGEBTWRUQQC XO5DF6WRNO263YCK3X6NMD4E6WIRWBYZBXEH2QULI65LGPAY2PPQC T2WZBTVFHVWPKL6AKEWSEVQBR3HWWWUPUNUP2MULF4WXEAZP46KQC E27ZWCDPESXDEHYZONCAKYL2U4K4ZLVXWX4453ICWSH4TGMQI4KQC WKQ7LFTPDGWTPJKRWB6DH5PUCX2HF34UCGJDIPYC5PTDX4MCZJXAC KZKLAINJJWZ64T5MUZT34LJVQIKBTKZ6EJGD7C7TTSSDGCHEDPMAC JAT3DXOLENZZGXE2NYFF3TVQAQIXMMNYO234ETKQGC2CRHJVZERQC LQLC7S3ADBR4O2JYVUSQJD65U3HG4ADOQBGB4F7KQCXUMNKMNEKAC KLUEQ6X5CXVBV3KLJKEHWQYHIU6AYPP2WT4PWKM2QZJ7SNACCJ6QC 2P27XV3DGJCRA4SNJENCJYZLPR2XWZMTY7CGYYSJOY4UMDVVO25AC GPQSOVBPY7VTPHD75R6VWSNITPOL3AECF4DHJB32MF5Z72NV7YMQC database, err := db.OpenReadOnlyDB(dbPath)if err != nil {return ExecuteSQLOutput{}, fmt.Errorf("database connection failed: %w", err)}defer database.Close()
var output ExecuteSQLOutputerr := db.WithReadDB(dbPath, func(database *sql.DB) error {rows, rerr := executeSQLQuery(ctx, database, query, input.Parameters)if rerr != nil {return rerr}defer rows.Close()
rows, err := executeSQLQuery(ctx, database, query, input.Parameters)if err != nil {return ExecuteSQLOutput{}, err}defer rows.Close()
columnInfo, columns, cerr := buildColumnInfo(rows)if cerr != nil {return cerr}
results, err := scanResultRows(rows, columns)if err != nil {return ExecuteSQLOutput{}, err}
// Handle empty results (return empty array, not error)if results == nil {results = []map[string]any{}}
// Handle empty results (return empty array, not error)if results == nil {results = []map[string]any{}}// Detect truncation: if we auto-added limit+1 and got more than limit rowslimited := falseif autoAddedLimit && len(results) > limit {limited = trueresults = results[:limit]}
// Detect truncation: if we auto-added limit+1 and got more than limit rowslimited := falseif autoAddedLimit && len(results) > limit {limited = trueresults = results[:limit]}
return ExecuteSQLOutput{Rows: results,RowCount: len(results),Columns: columnInfo,Limited: limited,Query: queryReported,}, nil
output = ExecuteSQLOutput{Rows: results,RowCount: len(results),Columns: columnInfo,Limited: limited,Query: queryReported,}return nil})return output, err
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_pattern")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
err := db.WithWriteTx(ctx, dbPath, "create_or_update_pattern", func(database *sql.DB, tx *db.LoggedTx) error {// Check if pattern with same record_s/sleep_s already existsexisting, found, ferr := findExistingPattern(ctx, tx, *input.RecordSeconds, *input.SleepSeconds)if ferr != nil {return fmt.Errorf("failed to check for existing pattern: %w", ferr)
}()// Check if pattern with same record_s/sleep_s already existsif existing, found, err := findExistingPattern(ctx, tx, *input.RecordSeconds, *input.SleepSeconds); err != nil {return output, fmt.Errorf("failed to check for existing pattern: %w", err)} else if found {if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)
if found {output.Pattern = existingoutput.Message = fmt.Sprintf("Pattern already exists with ID %s (record %ds, sleep %ds) - returning existing pattern",existing.ID, existing.RecordS, existing.SleepS)return nil // commit transaction
output.Pattern = existingoutput.Message = fmt.Sprintf("Pattern already exists with ID %s (record %ds, sleep %ds) - returning existing pattern",existing.ID, existing.RecordS, existing.SleepS)return output, nil}// Generate IDid, err := utils.GenerateShortID()if err != nil {return output, fmt.Errorf("failed to generate ID: %w", err)}// Insert pattern_, err = tx.ExecContext(ctx,"INSERT INTO cyclic_recording_pattern (id, record_s, sleep_s, created_at, last_modified, active) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.RecordSeconds, *input.SleepSeconds,)if err != nil {return output, fmt.Errorf("failed to create pattern: %w", err)}
// Fetch the created patternvar pattern db.CyclicRecordingPatternerr = tx.QueryRowContext(ctx,"SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",id,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active)if err != nil {return output, fmt.Errorf("failed to fetch created pattern: %w", err)}
// Generate IDid, gerr := utils.GenerateShortID()if gerr != nil {return fmt.Errorf("failed to generate ID: %w", gerr)}
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}
// Insert patternif _, err := tx.ExecContext(ctx,"INSERT INTO cyclic_recording_pattern (id, record_s, sleep_s, created_at, last_modified, active) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.RecordSeconds, *input.SleepSeconds,); err != nil {return fmt.Errorf("failed to create pattern: %w", err)}
output.Pattern = patternoutput.Message = fmt.Sprintf("Successfully created cyclic recording pattern with ID %s (record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)
// Fetch the created patternvar pattern db.CyclicRecordingPatternif err := tx.QueryRowContext(ctx,"SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",id,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active); err != nil {return fmt.Errorf("failed to fetch created pattern: %w", err)}
func updatePattern(ctx context.Context, input PatternInput) (PatternOutput, error) {var output PatternOutputif err := validateUpdatePatternInput(input); err != nil {return output, err}// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()// Verify pattern exists and check active status
// verifyPatternExistsAndActive checks that a pattern exists and is active.func verifyPatternExistsAndActive(database *sql.DB, patternID string) error {
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_pattern")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)
func updatePattern(ctx context.Context, input PatternInput) (PatternOutput, error) {var output PatternOutputif err := validateUpdatePatternInput(input); err != nil {return output, err
// Fetch the updated patternvar pattern db.CyclicRecordingPatternerr = tx.QueryRow("SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",*input.ID,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active)if err != nil {return output, fmt.Errorf("failed to fetch updated pattern: %w", err)}
if _, err := tx.Exec(query, args...); err != nil {return fmt.Errorf("failed to update pattern: %w", err)}
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}
// Fetch the updated patternvar pattern db.CyclicRecordingPatternif err := tx.QueryRow("SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",*input.ID,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active); err != nil {return fmt.Errorf("failed to fetch updated pattern: %w", err)}
output.Pattern = patternoutput.Message = fmt.Sprintf("Successfully updated pattern (ID: %s, record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)return output, nil
output.Pattern = patternoutput.Message = fmt.Sprintf("Successfully updated pattern (ID: %s, record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)return nil})return output, err
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_location")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
err := db.WithWriteTx(ctx, dbPath, "create_or_update_location", func(database *sql.DB, tx *db.LoggedTx) error {if err := verifyDatasetExistsAndActive(ctx, tx, *input.DatasetID); err != nil {return err
if err := verifyDatasetExistsAndActive(ctx, tx, *input.DatasetID); err != nil {return output, err}// Check for existing location with same name in dataset (UNIQUE constraint)var existingID stringerr = tx.QueryRowContext(ctx,"SELECT id FROM location WHERE dataset_id = ? AND name = ? AND active = true",*input.DatasetID, *input.Name,).Scan(&existingID)if err == nil {return returnExistingLocation(ctx, tx, existingID, output)}
// Check for existing location with same name in dataset (UNIQUE constraint)var existingID stringqerr := tx.QueryRowContext(ctx,"SELECT id FROM location WHERE dataset_id = ? AND name = ? AND active = true",*input.DatasetID, *input.Name,).Scan(&existingID)
// Generate IDid, err := utils.GenerateShortID()if err != nil {return output, fmt.Errorf("failed to generate ID: %w", err)}
if qerr == nil {result, rerr := returnExistingLocation(ctx, tx, existingID)if rerr != nil {return rerr}output = resultreturn nil}
// Insert location_, err = tx.ExecContext(ctx,"INSERT INTO location (id, dataset_id, name, latitude, longitude, timezone_id, description, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.DatasetID, *input.Name, *input.Latitude, *input.Longitude, *input.TimezoneID, input.Description,)if err != nil {return output, fmt.Errorf("failed to create location: %w", err)}
// Generate IDid, gerr := utils.GenerateShortID()if gerr != nil {return fmt.Errorf("failed to generate ID: %w", gerr)}
// Fetch the created locationlocation, err := fetchLocationByID(ctx, tx, id)if err != nil {return output, fmt.Errorf("failed to fetch created location: %w", err)}
// Insert locationif _, err := tx.ExecContext(ctx,"INSERT INTO location (id, dataset_id, name, latitude, longitude, timezone_id, description, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.DatasetID, *input.Name, *input.Latitude, *input.Longitude, *input.TimezoneID, input.Description,); err != nil {return fmt.Errorf("failed to create location: %w", err)}
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}
// Fetch the created locationlocation, ferr := fetchLocationByID(ctx, tx, id)if ferr != nil {return fmt.Errorf("failed to fetch created location: %w", ferr)}
output.Location = locationoutput.Message = fmt.Sprintf("Successfully created location '%s' with ID %s (%.6f, %.6f, %s)",location.Name, location.ID, location.Latitude, location.Longitude, location.TimezoneID)return output, nil
output.Location = locationoutput.Message = fmt.Sprintf("Successfully created location '%s' with ID %s (%.6f, %.6f, %s)",location.Name, location.ID, location.Latitude, location.Longitude, location.TimezoneID)return nil})return output, err
func returnExistingLocation(ctx context.Context, tx *db.LoggedTx, existingID string, output LocationOutput) (LocationOutput, error) {
// Caller is responsible for committing the transaction.func returnExistingLocation(ctx context.Context, tx *db.LoggedTx, existingID string) (LocationOutput, error) {var output LocationOutput
func verifyDatasetExistsAndActive(ctx context.Context, queryer interface {QueryRowContext(context.Context, string, ...any) *sql.Row}, datasetID string) error {var exists, active boolerr := queryer.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false)",datasetID, datasetID,).Scan(&exists, &active)if err != nil {return fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return fmt.Errorf("dataset (ID: %s) is not active", datasetID)}return nil
func verifyDatasetExistsAndActive(ctx context.Context, q db.Querier, datasetID string) error {_, err := db.DatasetExistsAndActive(q, datasetID)return err
// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()
err := db.WithWriteTx(ctx, dbPath, "create_or_update_location", func(database *sql.DB, tx *db.LoggedTx) error {if err := verifyLocationExistsAndActive(database, locationID); err != nil {return err}
if err := verifyLocationExistsAndActive(database, locationID); err != nil {return output, err}
// Verify dataset exists if DatasetID provided (relationship consistency)if input.DatasetID != nil {if err := verifyDatasetExistsAndActive(ctx, database, *input.DatasetID); err != nil {return err}}
// Verify dataset exists if DatasetID provided (relationship consistency)if input.DatasetID != nil {if err := verifyDatasetExistsAndActive(context.Background(), database, *input.DatasetID); err != nil {return output, err
updates, args, uerr := buildLocationUpdates(input, locationID)if uerr != nil {return uerr
}updates, args, err := buildLocationUpdates(input, locationID)if err != nil {return output, err}query := fmt.Sprintf("UPDATE location SET %s WHERE id = ?", strings.Join(updates, ", "))
query := fmt.Sprintf("UPDATE location SET %s WHERE id = ?", strings.Join(updates, ", "))
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_location")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
if _, err := tx.ExecContext(ctx, query, args...); err != nil {return fmt.Errorf("failed to update location: %w", err)
}()_, err = tx.ExecContext(ctx, query, args...)if err != nil {return output, fmt.Errorf("failed to update location: %w", err)}// Fetch the updated locationlocation, err := fetchLocationByID(ctx, tx, locationID)if err != nil {return output, fmt.Errorf("failed to fetch updated location: %w", err)}
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}
// Fetch the updated locationlocation, ferr := fetchLocationByID(ctx, tx, locationID)if ferr != nil {return fmt.Errorf("failed to fetch updated location: %w", ferr)}
output.Location = locationoutput.Message = fmt.Sprintf("Successfully updated location '%s' (ID: %s)", location.Name, location.ID)return output, nil
output.Location = locationoutput.Message = fmt.Sprintf("Successfully updated location '%s' (ID: %s)", location.Name, location.ID)return nil})return output, err
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "import_unstructured")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()}}()
err := db.WithWriteTx(ctx, dbPath, "import_unstructured", func(database *sql.DB, tx *db.LoggedTx) error {// Process each filefor _, filePath := range files {fileResult, procErr := processUnstructuredFile(tx, filePath, input.DatasetID)
// Process each filefor _, filePath := range files {fileResult, procErr := processUnstructuredFile(tx, filePath, input.DatasetID)
if procErr != nil {output.FailedFiles++output.Errors = append(output.Errors, utils.FileImportError{FileName: filepath.Base(filePath),Error: procErr.Error(),Stage: utils.StageProcess,})continue}
if procErr != nil {output.FailedFiles++output.Errors = append(output.Errors, utils.FileImportError{FileName: filepath.Base(filePath),Error: procErr.Error(),Stage: utils.StageProcess,})continue
if fileResult.Skipped {output.SkippedFiles++} else {output.ImportedFiles++output.TotalDuration += fileResult.Duration}
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)",input.DatasetID,).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", input.DatasetID)}
return db.WithReadDB(dbPath, func(database *sql.DB) error {// Verify dataset exists and is activeif _, err := db.DatasetExistsAndActive(database, input.DatasetID); err != nil {return err}
// Verify dataset is 'unstructured' typeif err := db.ValidateDatasetTypeUnstructured(database, input.DatasetID); err != nil {return err}
// Verify dataset is 'unstructured' typeif err := db.ValidateDatasetTypeUnstructured(database, input.DatasetID); err != nil {return err}
var datasetType stringerr := dbConn.QueryRow(`SELECT type FROM dataset WHERE id = ? AND active = true`, datasetID).Scan(&datasetType)if err == sql.ErrNoRows {return fmt.Errorf("dataset not found: %s", datasetID)}if err != nil {return fmt.Errorf("failed to query dataset: %w", err)
if err := db.ValidateDatasetTypeForImport(dbConn, datasetID); err != nil {return err
var locationExists boolerr = dbConn.QueryRow(`SELECT EXISTS(SELECT 1 FROM location WHERE id = ? AND dataset_id = ? AND active = true)`, locationID, datasetID).Scan(&locationExists)if err != nil {return fmt.Errorf("failed to query location: %w", err)
if err := db.ValidateLocationBelongsToDataset(dbConn, locationID, datasetID); err != nil {return err
var clusterExists boolerr = dbConn.QueryRow(`SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ? AND location_id = ? AND active = true)`, clusterID, locationID).Scan(&clusterExists)if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}if !clusterExists {return fmt.Errorf("cluster not found or not linked to location: %s", clusterID)
if err := db.ClusterBelongsToLocation(dbConn, clusterID, locationID); err != nil {return err
// Open database for validation queriesdatabase, err := db.OpenReadOnlyDB(dbPath)if err != nil {return fmt.Errorf("failed to open database: %w", err)}defer database.Close()
return db.WithReadDB(dbPath, func(database *sql.DB) error {// Verify dataset exists, is active, and is 'structured' typeif err := db.ValidateDatasetTypeForImport(database, datasetID); err != nil {return err}
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)", datasetID).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", datasetID)}
// Verify location exists and belongs to datasetif err := db.ValidateLocationBelongsToDataset(database, locationID, datasetID); err != nil {return err}
// Verify dataset is 'structured' type (file imports only support structured datasets)if err := db.ValidateDatasetTypeForImport(database, datasetID); err != nil {return err}// Verify location exists and belongs to datasetvar locationDatasetID stringerr = database.QueryRow("SELECT dataset_id FROM location WHERE id = ? AND active = true", locationID).Scan(&locationDatasetID)if err == sql.ErrNoRows {return fmt.Errorf("location not found or inactive: %s", locationID)}if err != nil {return fmt.Errorf("failed to query location: %w", err)}if locationDatasetID != datasetID {return fmt.Errorf("location %s does not belong to dataset %s", locationID, datasetID)}// Verify cluster exists and belongs to locationvar clusterLocationID stringerr = database.QueryRow("SELECT location_id FROM cluster WHERE id = ? AND active = true", clusterID).Scan(&clusterLocationID)if err == sql.ErrNoRows {return fmt.Errorf("cluster not found or inactive: %s", clusterID)}if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}if clusterLocationID != locationID {return fmt.Errorf("cluster %s does not belong to location %s", clusterID, locationID)}
// Verify cluster exists and belongs to locationif err := db.ClusterBelongsToLocation(database, clusterID, locationID); err != nil {return err}
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()
err = db.WithWriteTx(ctx, dbPath, "create_or_update_dataset", func(database *sql.DB, tx *db.LoggedTx) error {// Check for existing dataset with same name (UNIQUE constraint)var existingID stringqerr := tx.QueryRowContext(ctx,"SELECT id FROM dataset WHERE name = ? AND active = true",*input.Name,).Scan(&existingID)
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_dataset")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
if qerr == nil {result, herr := handleExistingDataset(ctx, tx, existingID)if herr != nil {return herr}output = resultreturn nil
// Check for existing dataset with same name (UNIQUE constraint)var existingID stringerr = tx.QueryRowContext(ctx,"SELECT id FROM dataset WHERE name = ? AND active = true",*input.Name,).Scan(&existingID)if err == nil {return handleExistingDataset(ctx, tx, existingID)}return insertNewDataset(ctx, tx, *input.Name, input.Description, datasetType)
result, insErr := insertNewDataset(ctx, tx, *input.Name, input.Description, datasetType)if insErr != nil {return insErr}output = resultreturn nil})return output, err
// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()
err := db.WithWriteTx(ctx, dbPath, "create_or_update_dataset", func(database *sql.DB, tx *db.LoggedTx) error {// Verify dataset exists and check active statusif err := verifyDatasetActive(database, datasetID); err != nil {return err}
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_dataset")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {
// Fetch the updated datasetvar dataset db.Dataseterr := tx.QueryRow("SELECT id, name, description, created_at, last_modified, active, type FROM dataset WHERE id = ?",datasetID,).Scan(&dataset.ID, &dataset.Name, &dataset.Description, &dataset.CreatedAt, &dataset.LastModified, &dataset.Active, &dataset.Type)
}()_, err = tx.Exec(query, args...)if err != nil {return output, fmt.Errorf("failed to update dataset: %w", err)}// Fetch the updated datasetvar dataset db.Dataseterr = tx.QueryRow("SELECT id, name, description, created_at, last_modified, active, type FROM dataset WHERE id = ?",datasetID,).Scan(&dataset.ID, &dataset.Name, &dataset.Description, &dataset.CreatedAt, &dataset.LastModified, &dataset.Active, &dataset.Type)if err != nil {return output, fmt.Errorf("failed to fetch updated dataset: %w", err)}if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}output.Dataset = datasetoutput.Message = fmt.Sprintf("Successfully updated dataset '%s' (ID: %s)", dataset.Name, dataset.ID)
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_cluster")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
err := db.WithWriteTx(ctx, dbPath, "create_or_update_cluster", func(database *sql.DB, tx *db.LoggedTx) error {// Verify parent references exist and are activedatasetName, locationName, verr := verifyClusterParentRefs(ctx, tx, input)if verr != nil {return verr
// Verify parent references exist and are activedatasetName, locationName, err := verifyClusterParentRefs(ctx, tx, input)if err != nil {return output, err}
// Check for existing cluster with same name in location (UNIQUE constraint)existing, findErr := findExistingClusterInLocation(ctx, tx, *input.LocationID, *input.Name)if findErr == nil {output.Cluster = existingoutput.Message = fmt.Sprintf("Cluster '%s' already exists in location '%s' (ID: %s) - returning existing cluster", existing.Name, locationName, existing.ID)return nil // commit transaction}
// Check for existing cluster with same name in location (UNIQUE constraint)existing, err := findExistingClusterInLocation(ctx, tx, *input.LocationID, *input.Name)if err == nil {if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)
result, insErr := insertNewCluster(ctx, tx, input, datasetName, locationName)if insErr != nil {return insErr
output.Cluster = existingoutput.Message = fmt.Sprintf("Cluster '%s' already exists in location '%s' (ID: %s) - returning existing cluster", existing.Name, locationName, existing.ID)return output, nil}return insertNewCluster(ctx, tx, input, datasetName, locationName)
output = resultreturn nil // commit transaction})return output, err
// verifyDatasetForCluster verifies dataset exists and is active within a transactionfunc verifyDatasetForCluster(ctx context.Context, tx *db.LoggedTx, datasetID string) (string, error) {var exists, active boolvar name stringerr := tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false), COALESCE((SELECT name FROM dataset WHERE id = ?), '')",datasetID, datasetID, datasetID,).Scan(&exists, &active, &name)if err != nil {return "", fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return "", fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return "", fmt.Errorf("dataset '%s' (ID: %s) is not active", name, datasetID)}return name, nil}// verifyLocationForCluster verifies location exists, is active, and belongs to the datasetfunc verifyLocationForCluster(ctx context.Context, tx *db.LoggedTx, locationID, datasetID, datasetName string) (string, error) {var exists, active boolvar name, locDatasetID stringerr := tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false), COALESCE((SELECT name FROM location WHERE id = ?), ''), COALESCE((SELECT dataset_id FROM location WHERE id = ?), '')",locationID, locationID, locationID, locationID,).Scan(&exists, &active, &name, &locDatasetID)if err != nil {return "", fmt.Errorf("failed to verify location: %w", err)}if !exists {return "", fmt.Errorf("location with ID '%s' does not exist", locationID)}if !active {return "", fmt.Errorf("location '%s' (ID: %s) is not active", name, locationID)}if locDatasetID != datasetID {return "", fmt.Errorf("location '%s' (ID: %s) does not belong to dataset '%s' (ID: %s) - it belongs to dataset ID '%s'",name, locationID, datasetName, datasetID, locDatasetID)}return name, nil}
database, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()if err := validateClusterActive(database, clusterID); err != nil {return output, err}if err := validateClusterCyclicPattern(database, input); err != nil {return output, err}query, args, err := buildClusterUpdateQuery(input, clusterID)if err != nil {return output, err}
err = db.WithWriteTx(ctx, dbPath, "create_or_update_cluster", func(database *sql.DB, tx *db.LoggedTx) error {if err := validateClusterActive(database, clusterID); err != nil {return err}
tx, err := db.BeginLoggedTx(ctx, database, "create_or_update_cluster")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()
if err := validateClusterCyclicPattern(database, input); err != nil {return err
if _, err = tx.Exec(query, args...); err != nil {return output, fmt.Errorf("failed to update cluster: %w", err)}
query, args, qerr := buildClusterUpdateQuery(input, clusterID)if qerr != nil {return qerr}
cluster, err := fetchClusterByID(ctx, tx, clusterID)if err != nil {return output, fmt.Errorf("failed to fetch updated cluster: %w", err)}
if _, err := tx.Exec(query, args...); err != nil {return fmt.Errorf("failed to update cluster: %w", err)}
output.Cluster = clusteroutput.Message = fmt.Sprintf("Successfully updated cluster '%s' (ID: %s)", cluster.Name, cluster.ID)return output, nil
output.Cluster = clusteroutput.Message = fmt.Sprintf("Successfully updated cluster '%s' (ID: %s)", cluster.Name, cluster.ID)return nil})return output, err
}// RavenSelection represents a single Raven selectiontype RavenSelection struct {StartTime float64EndTime float64FreqLow float64FreqHigh float64Species string}// ravenJob represents a single Raven file to processtype ravenJob struct {ravenFile string}// ravenResult represents the result of processing a single Raven filetype ravenResult struct {ravenFile stringcalls []ClusteredCallwritten boolskipped boolerr error}func (r ravenResult) filePath() string { return r.ravenFile }func (r ravenResult) getCalls() []ClusteredCall { return r.calls }func (r ravenResult) wasWritten() bool { return r.written }func (r ravenResult) wasSkipped() bool { return r.skipped }func (r ravenResult) getError() error { return r.err }// CallsFromRaven processes Raven selection files and writes .data filesfunc CallsFromRaven(input CallsFromRavenInput) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"// Collect Raven files to processvar ravenFiles []stringif input.File != "" {ravenFiles = []string{input.File}} else if input.Folder != "" {var err errorravenFiles, err = findRavenFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find Raven files: %v", err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if len(ravenFiles) == 0 {errMsg := "No Raven files found"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}// Single file or small batch: process sequentially (avoid goroutine overhead)if len(ravenFiles) < 10 {return callsFromRavenSequential(input, ravenFiles)}// Large batch: parallel processing with DirCachereturn callsFromRavenParallel(input, ravenFiles)
// callsFromRavenSequential processes Raven files one at a time (for small batches)func callsFromRavenSequential(input CallsFromRavenInput, ravenFiles []string) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"// Build DirCache once for the folder (even sequential benefits from avoiding repeated dir scans)dirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, ravenFile := range ravenFiles {dir := filepath.Dir(ravenFile)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := processRavenFileCached(ravenFile, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", ravenFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++}
// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(ravenFile); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", ravenFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(ravenFiles), filepath.Base(ravenFile))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// callsFromRavenParallel processes Raven files concurrently using a worker pool and DirCachefunc callsFromRavenParallel(input CallsFromRavenInput, ravenFiles []string) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"total := len(ravenFiles)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan ravenJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go ravenWorker(dirCaches, jobs, results, &wg)}// Send jobsfor _, ravenFile := range ravenFiles {jobs <- ravenJob{ravenFile: ravenFile}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// ravenWorker processes Raven files from the jobs channelfunc ravenWorker(dirCaches *sync.Map, jobs <-chan ravenJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.ravenFile)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := processRavenFileCached(job.ravenFile, cache)results <- ravenResult{ravenFile: job.ravenFile,calls: calls,written: written,skipped: skipped,err: err,}}}// findRavenFiles finds all Raven selection files in a folderfunc findRavenFiles(folder string) ([]string, error) {
func (ravenSource) FindFiles(folder string) ([]string, error) {
func (ravenSource) ProcessFile(ravenFile string, cache *DirCache) ([]ClusteredCall, bool, bool, error) {return processRavenFileCached(ravenFile, cache)}// CallsFromRaven processes Raven selection files and writes .data filesfunc CallsFromRaven(input CallsFromRavenInput) (CallsFromRavenOutput, error) {src := ravenSource{}commonInput := CallsFromSourceInput(input)
commonOutput, err := callsFromSource(src, commonInput)// Convert to Raven-specific output typevar output CallsFromRavenOutputoutput.Calls = commonOutput.Callsoutput.TotalCalls = commonOutput.TotalCallsoutput.SpeciesCount = commonOutput.SpeciesCountoutput.DataFilesWritten = commonOutput.DataFilesWrittenoutput.DataFilesSkipped = commonOutput.DataFilesSkippedoutput.FilesProcessed = commonOutput.FilesProcessedoutput.FilesDeleted = commonOutput.FilesDeletedoutput.Filter = commonOutput.Filteroutput.Error = commonOutput.Errorreturn output, err}// RavenSelection represents a single Raven selectiontype RavenSelection struct {StartTime float64EndTime float64FreqLow float64FreqHigh float64Species string}
package toolsimport ("fmt""os""path/filepath""sort""sync""sync/atomic")// CallsFromSourceInput defines the common input for calls-from-source toolstype CallsFromSourceInput struct {Folder string `json:"folder"`File string `json:"file"`Delete bool `json:"delete"`ProgressHandler ProgressHandler `json:"-"` // Optional progress callback}// CallsFromSourceOutput defines the common output for calls-from-source toolstype CallsFromSourceOutput struct {Calls []ClusteredCall `json:"calls"`TotalCalls int `json:"total_calls"`SpeciesCount map[string]int `json:"species_count"`DataFilesWritten int `json:"data_files_written"`DataFilesSkipped int `json:"data_files_skipped"`FilesProcessed int `json:"files_processed"`FilesDeleted int `json:"files_deleted"`Filter string `json:"filter"`Error *string `json:"error,omitempty"`}// CallSource abstracts a source of bird call data (Raven, BirdNET, etc.)type CallSource interface {// Name returns the display name (e.g. "Raven", "BirdNET")Name() string// FindFiles discovers source files in the given folderFindFiles(folder string) ([]string, error)// ProcessFile processes a single source file and returns calls, write/skip statusProcessFile(path string, cache *DirCache) (calls []ClusteredCall, written, skipped bool, err error)}// callsFromSource is the shared entry point for all call source tools.func callsFromSource(src CallSource, input CallsFromSourceInput) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()// Collect source files to processvar files []stringif input.File != "" {files = []string{input.File}} else if input.Folder != "" {var err errorfiles, err = src.FindFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find %s files: %v", src.Name(), err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if len(files) == 0 {errMsg := fmt.Sprintf("No %s files found", src.Name())output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}// Single file or small batch: process sequentially (avoid goroutine overhead)if len(files) < 10 {return callsFromSourceSequential(src, input, files)}// Large batch: parallel processing with DirCachereturn callsFromSourceParallel(src, input, files)}// callsFromSourceSequential processes source files one at a time (for small batches)func callsFromSourceSequential(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()// Build DirCache once for the folderdirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, file := range files {dir := filepath.Dir(file)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := src.ProcessFile(file, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", file, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++}for _, call := range calls {allCalls = append(allCalls, call)speciesCount[call.EbirdCode]++}filesProcessed++// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(file); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", file, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(files), filepath.Base(file))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// sourceJob represents a single file to process (generic over CallSource)type sourceJob struct {filePath string}// sourceResult represents the result of processing a single source filetype sourceResult struct {path stringcalls []ClusteredCallwritten boolskipped boolerr error}func (r sourceResult) filePath() string { return r.path }func (r sourceResult) getCalls() []ClusteredCall { return r.calls }func (r sourceResult) wasWritten() bool { return r.written }func (r sourceResult) wasSkipped() bool { return r.skipped }func (r sourceResult) getError() error { return r.err }// callsFromSourceParallel processes source files concurrently using a worker pool and DirCachefunc callsFromSourceParallel(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()total := len(files)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan sourceJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go sourceWorker(src, dirCaches, jobs, results, &wg)}// Send jobsfor _, file := range files {jobs <- sourceJob{filePath: file}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// sourceWorker processes source files from the jobs channelfunc sourceWorker(src CallSource, dirCaches *sync.Map, jobs <-chan sourceJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.filePath)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := src.ProcessFile(job.filePath, cache)results <- sourceResult{path: job.filePath,calls: calls,written: written,skipped: skipped,err: err,}}}
}// BirdNETDetection represents a single BirdNET detectiontype BirdNETDetection struct {StartTime float64EndTime float64ScientificName stringCommonName stringConfidence float64WAVPath string}// birdaJob represents a single BirdNET file to processtype birdaJob struct {birdaFile string}// birdaResult represents the result of processing a single BirdNET filetype birdaResult struct {birdaFile stringcalls []ClusteredCallwritten boolskipped boolerr error
func (r birdaResult) filePath() string { return r.birdaFile }func (r birdaResult) getCalls() []ClusteredCall { return r.calls }func (r birdaResult) wasWritten() bool { return r.written }func (r birdaResult) wasSkipped() bool { return r.skipped }func (r birdaResult) getError() error { return r.err }// CallsFromBirda processes BirdNET results files and writes .data filesfunc CallsFromBirda(input CallsFromBirdaInput) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"// Collect BirdNET files to processvar birdaFiles []stringif input.File != "" {birdaFiles = []string{input.File}} else if input.Folder != "" {var err errorbirdaFiles, err = findBirdaFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find BirdNET files: %v", err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}
// Large batch: parallel processing with DirCachereturn callsFromBirdaParallel(input, birdaFiles)}// callsFromBirdaSequential processes BirdNET files one at a time (for small batches)func callsFromBirdaSequential(input CallsFromBirdaInput, birdaFiles []string) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"// Build DirCache once for the folderdirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, birdaFile := range birdaFiles {dir := filepath.Dir(birdaFile)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := processBirdaFileCached(birdaFile, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", birdaFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++}for _, call := range calls {allCalls = append(allCalls, call)speciesCount[call.EbirdCode]++}filesProcessed++// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(birdaFile); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", birdaFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(birdaFiles), filepath.Base(birdaFile))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// callsFromBirdaParallel processes BirdNET files concurrently using a worker pool and DirCachefunc callsFromBirdaParallel(input CallsFromBirdaInput, birdaFiles []string) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"total := len(birdaFiles)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan birdaJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go birdaWorker(dirCaches, jobs, results, &wg)}// Send jobsfor _, birdaFile := range birdaFiles {jobs <- birdaJob{birdaFile: birdaFile}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// birdaWorker processes BirdNET files from the jobs channelfunc birdaWorker(dirCaches *sync.Map, jobs <-chan birdaJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.birdaFile)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := processBirdaFileCached(job.birdaFile, cache)results <- birdaResult{birdaFile: job.birdaFile,calls: calls,written: written,skipped: skipped,err: err,}}}// findBirdaFiles finds all BirdNET results files in a folderfunc findBirdaFiles(folder string) ([]string, error) {
func (birdaSource) FindFiles(folder string) ([]string, error) {
commonOutput, err := callsFromSource(src, commonInput)// Convert to Birda-specific output typevar output CallsFromBirdaOutputoutput.Calls = commonOutput.Callsoutput.TotalCalls = commonOutput.TotalCallsoutput.SpeciesCount = commonOutput.SpeciesCountoutput.DataFilesWritten = commonOutput.DataFilesWrittenoutput.DataFilesSkipped = commonOutput.DataFilesSkippedoutput.FilesProcessed = commonOutput.FilesProcessedoutput.FilesDeleted = commonOutput.FilesDeletedoutput.Filter = commonOutput.Filteroutput.Error = commonOutput.Errorreturn output, err}// BirdNETDetection represents a single BirdNET detectiontype BirdNETDetection struct {StartTime float64EndTime float64ScientificName stringCommonName stringConfidence float64WAVPath string}
if _, err := fmt.Sscanf(record[idx.startIdx], "%f", &det.StartTime); err != nil {return nil, fmt.Errorf("failed to parse start time %q: %w", record[idx.startIdx], err)
startTime, perr := strconv.ParseFloat(record[idx.startIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse start time %q: %w", record[idx.startIdx], perr)
if _, err := fmt.Sscanf(record[idx.endIdx], "%f", &det.EndTime); err != nil {return nil, fmt.Errorf("failed to parse end time %q: %w", record[idx.endIdx], err)
det.StartTime = startTimeendTime, perr := strconv.ParseFloat(record[idx.endIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse end time %q: %w", record[idx.endIdx], perr)
if _, err := fmt.Sscanf(record[idx.confidenceIdx], "%f", &det.Confidence); err != nil {return nil, fmt.Errorf("failed to parse confidence %q: %w", record[idx.confidenceIdx], err)
confidence, perr := strconv.ParseFloat(record[idx.confidenceIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse confidence %q: %w", record[idx.confidenceIdx], perr)
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)", input.DatasetID).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", input.DatasetID)}
func ValidateDatasetTypeForImport(database *sql.DB, datasetID string) error {datasetType, exists, err := GetDatasetType(database, datasetID)
func ValidateDatasetTypeForImport(q Querier, datasetID string) error {datasetType, exists, err := GetDatasetType(q, datasetID)
func ValidateDatasetTypeUnstructured(database *sql.DB, datasetID string) error {datasetType, exists, err := GetDatasetType(database, datasetID)
func ValidateDatasetTypeUnstructured(q Querier, datasetID string) error {datasetType, exists, err := GetDatasetType(q, datasetID)
// DatasetExistsAndActive checks that a dataset exists and is active.// Returns the dataset name if found.func DatasetExistsAndActive(q Querier, datasetID string) (name string, err error) {var exists, active boolerr = q.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false), COALESCE((SELECT name FROM dataset WHERE id = ?), '')",datasetID, datasetID, datasetID,).Scan(&exists, &active, &name)if err != nil {return "", fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return "", fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return "", fmt.Errorf("dataset '%s' (ID: %s) is not active", name, datasetID)}return name, nil}// LocationBelongsToDataset checks that a location exists, is active, and belongs to the dataset.// Returns the location name if found.func LocationBelongsToDataset(q Querier, locationID, datasetID string) (name string, err error) {var exists, active boolvar locDatasetID stringerr = q.QueryRow("SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false), COALESCE((SELECT name FROM location WHERE id = ?), ''), COALESCE((SELECT dataset_id FROM location WHERE id = ?), '')",locationID, locationID, locationID, locationID,).Scan(&exists, &active, &name, &locDatasetID)if err != nil {return "", fmt.Errorf("failed to verify location: %w", err)}if !exists {return "", fmt.Errorf("location with ID '%s' does not exist", locationID)}if !active {return "", fmt.Errorf("location '%s' (ID: %s) is not active", name, locationID)}if locDatasetID != datasetID {return "", fmt.Errorf("location '%s' (ID: %s) does not belong to dataset ID '%s'",name, locationID, locDatasetID)}return name, nil}// ClusterBelongsToLocation checks that a cluster exists, is active, and belongs to the location.func ClusterBelongsToLocation(q Querier, clusterID, locationID string) error {var exists, active boolvar clusterLocationID stringerr := q.QueryRow("SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ?), COALESCE((SELECT active FROM cluster WHERE id = ?), false), COALESCE((SELECT location_id FROM cluster WHERE id = ?), '')",clusterID, clusterID, clusterID,).Scan(&exists, &active, &clusterLocationID)if err != nil {return fmt.Errorf("failed to verify cluster: %w", err)}if !exists {return fmt.Errorf("cluster with ID '%s' does not exist", clusterID)}if !active {return fmt.Errorf("cluster '%s' is not active", clusterID)}if clusterLocationID != locationID {return fmt.Errorf("cluster '%s' does not belong to location '%s'", clusterID, locationID)}return nil}
}// WithReadDB opens a read-only DB connection, calls fn, and closes the connection.// This is a convenience wrapper that ensures the connection is always closed.func WithReadDB(dbPath string, fn func(*sql.DB) error) error {database, err := OpenReadOnlyDB(dbPath)if err != nil {return fmt.Errorf("database connection failed: %w", err)}defer database.Close()return fn(database)
// WithWriteTx opens a writeable DB connection, begins a logged transaction, calls fn,// and commits on success (or rollbacks on error). The connection is always closed.// The fn callback receives both the *sql.DB (for pre-validation queries) and the// *LoggedTx (for mutation operations).func WithWriteTx(ctx context.Context, dbPath, toolName string, fn func(*sql.DB, *LoggedTx) error) error {database, err := OpenWriteableDB(dbPath)if err != nil {return fmt.Errorf("database connection failed: %w", err)}defer database.Close()tx, err := BeginLoggedTx(ctx, database, toolName)if err != nil {return fmt.Errorf("failed to begin transaction: %w", err)}defer tx.Rollback() // no-op after commitif err := fn(database, tx); err != nil {return err}return tx.Commit()}
Four refactoring changes in tools/ and db/:- **Added `db.WithWriteTx` and `db.WithReadDB` helpers** (db/db.go): Extracted theopen-DB→begin-tx→defer-rollback→commit→close-DB boilerplate that was repeated across14+ tool entry points. `WithWriteTx(ctx, dbPath, name, fn)` opens a writeable DB, beginsa logged transaction, calls fn, and commits on success / rollbacks on error. `WithReadDB`does the same for read-only connections. Applied to all create/update functions incluster, dataset, location, and pattern, plus import_unstructured, import_files (validation),sql, and bulk_file_import (validation). Eliminates inconsistent rollback handling(some used `defer tx.Rollback()`, others `defer func() { if err != nil { tx.Rollback() } }()`)and removes ~100 lines of boilerplate.- **Introduced `CallSource` interface** (tools/calls_from_common.go): Extracted sharedscaffolding from `calls_from_raven.go` and `calls_from_birda.go` into a `CallSource`interface with `Name()`, `FindFiles()`, and `ProcessFile()` methods. Both files nowimplement the interface and delegate to `callsFromSource()`, which handles thesequential/parallel dispatch, DirCache management, worker pool, and result aggregation.Public API (`CallsFromRaven`, `CallsFromBirda`) unchanged. ~100 lines saved.- **Extracted hierarchy validation primitives** (db/validation.go): Added `Querier`interface, `DatasetExistsAndActive()`, `LocationBelongsToDataset()`, and`ClusterBelongsToLocation()` to db/validation.go. Replaced three near-duplicatefunctions: `validateSegmentHierarchy` (import_segments.go, 14→7 cyclomatic),`validateHierarchyIDs` (import_files.go, 14→7 cyclomatic), and`verifyClusterParentRefs` plus its helpers `verifyDatasetForCluster` and`verifyLocationForCluster` (cluster.go, deleted ~50 lines). Also replaced inlinedataset-exists-and-active checks in bulk_file_import.go and import_unstructured.go.- **`fmt.Sscanf("%f", ...)→ strconv.ParseFloat` and deleted `extractFilename`**: Replaced7 `fmt.Sscanf` calls in calls_from_raven.go and calls_from_birda.go with`strconv.ParseFloat` (faster, idiomatic). Deleted `extractFilename()` fromcalls_from_preds.go — it was a one-line wrapper over `filepath.Base`.Functions at cyclomatic >13 reduced from 9 to 6.