Skip to content

Commit 5be7396

Browse files
vanessavmacmohamedelabbas1996mihow
authored
Support re-processing detections and skipping localizer (#815)
* Set up customizable local processing service * Set up separate docker compose stack, rename ml backend services * WIP: README.md * Improve processing flow * fix: tests and postgres connection * Update READMEs with minimal/example setups * fix: transformers fixed version * Add tests * Typos, warn --> warnings * Add support for Darsa flat-bug * chore: Change the Pipeline class name to FlatBugDetectorPipeline to avoid shadowing the FlatBugDetector model * Move README * Address comment tasks * Update README * Pass in pipeline request config, properly cache models, simplifications * fix: update docker compose instructions & build path * feat: use ["insect"] for the default zero-shot class * feat: try to use faster version of zero-shot detector * feat: use gpu if available * fix: update minimal docker compose build path * Add back crop_image_url * Support re-processing detections and skipping localizer * fix: correctly pass candidate labels for zero shot object detector * Support re-processing detections and skipping localizer * fix: allow empty pipeline request config * fix: allow empty pipeline request config * clean up * fix: ignore detection algorithm during reprocessing * remove flat bug * feat: only use zero shot and HF classifier algorithms * clean up * Expand support for date formats in image filenames (#809) * feat: support more date formats in filenames * feat: update and centralize validation language * fix: change name of the new docker network (#819) * docs: clarify new Detection schema/class * Function for creating detection instances from requests * Add reprocessing to minimal app * Add re-processing test * Fix requirements * Address review comments * Only open source image once * feat: cache huggingface & torch models that are auto-downloaded * fix: leave gpu passthrough as an example, off by default * feat: feature flag & pipeline config for reprocessing detections * fix: spelling in previous feature flag * chore: migration for new & corrected feature flags * fix: append detections instead of overriding. add feature flag to tests. --------- Co-authored-by: mohamedelabbas1996 <hack1996man@gmail.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
1 parent ac4f705 commit 5be7396

22 files changed

+1063
-255
lines changed

ami/main/api/views.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ def perform_create(self, serializer):
788788
obj = serializer.save(user=user)
789789

790790
# Get process_now flag from project feature flags
791-
process_now = project.feature_flags.auto_processs_manual_uploads
791+
process_now = project.feature_flags.auto_process_manual_uploads
792792

793793
# Create source image from the upload
794794
source_image = create_source_image_from_upload(
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Generated by Django 4.2.10 on 2025-08-15 21:01
2+
3+
import ami.main.models
4+
from django.db import migrations
5+
import django_pydantic_field.fields
6+
7+
8+
class Migration(migrations.Migration):
9+
dependencies = [
10+
("main", "0066_alter_project_feature_flags_and_more"),
11+
]
12+
13+
operations = [
14+
migrations.AlterField(
15+
model_name="project",
16+
name="feature_flags",
17+
field=django_pydantic_field.fields.PydanticSchemaField(
18+
blank=True,
19+
config=None,
20+
default={"auto_process_manual_uploads": False, "reprocess_existing_detections": False, "tags": False},
21+
schema=ami.main.models.ProjectFeatureFlags,
22+
),
23+
),
24+
]

ami/main/models.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from ami.base.models import BaseModel
3232
from ami.main import charts
3333
from ami.main.models_future.projects import ProjectSettingsMixin
34+
from ami.ml.schemas import BoundingBox
3435
from ami.users.models import User
3536
from ami.utils.media import calculate_file_checksum, extract_timestamp
3637
from ami.utils.schemas import OrderedEnum
@@ -202,7 +203,8 @@ class ProjectFeatureFlags(pydantic.BaseModel):
202203
"""
203204

204205
tags: bool = False # Whether the project supports tagging taxa
205-
auto_processs_manual_uploads: bool = False # Whether to automatically process uploaded images
206+
auto_process_manual_uploads: bool = False # Whether to automatically process uploaded images
207+
reprocess_existing_detections: bool = False # Whether to reprocess existing detections
206208

207209

208210
default_feature_flags = ProjectFeatureFlags()
@@ -2263,6 +2265,17 @@ class Detection(BaseModel):
22632265
source_image_id: int
22642266
detection_algorithm_id: int
22652267

2268+
def get_bbox(self):
2269+
if self.bbox:
2270+
return BoundingBox(
2271+
x1=self.bbox[0],
2272+
y1=self.bbox[1],
2273+
x2=self.bbox[2],
2274+
y2=self.bbox[3],
2275+
)
2276+
else:
2277+
return None
2278+
22662279
# def bbox(self):
22672280
# return (
22682281
# self.bbox_x,

ami/ml/models/pipeline.py

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
if TYPE_CHECKING:
66
from ami.ml.models import ProcessingService, ProjectPipelineConfig
77
from ami.jobs.models import Job
8-
from ami.main.models import Project
98

109
import collections
1110
import dataclasses
@@ -28,6 +27,7 @@
2827
Deployment,
2928
Detection,
3029
Occurrence,
30+
Project,
3131
SourceImage,
3232
SourceImageCollection,
3333
TaxaList,
@@ -39,7 +39,9 @@
3939
from ami.ml.models.algorithm import Algorithm, AlgorithmCategoryMap
4040
from ami.ml.schemas import (
4141
AlgorithmConfigResponse,
42+
AlgorithmReference,
4243
ClassificationResponse,
44+
DetectionRequest,
4345
DetectionResponse,
4446
PipelineRequest,
4547
PipelineRequestConfigParameters,
@@ -62,6 +64,7 @@ def filter_processed_images(
6264
Return only images that need to be processed by a given pipeline.
6365
An image needs processing if:
6466
1. It has no detections from the pipeline's detection algorithm
67+
or
6568
2. It has detections but they don't have classifications from all the pipeline's classification algorithms
6669
"""
6770
pipeline_algorithms = pipeline.algorithms.all()
@@ -175,6 +178,15 @@ def process_images(
175178
job = Job.objects.get(pk=job_id)
176179
task_logger = job.logger
177180

181+
if project_id:
182+
project = Project.objects.get(pk=project_id)
183+
else:
184+
task_logger.warning(f"Pipeline {pipeline} is not associated with a project")
185+
project = None
186+
187+
pipeline_config = pipeline.get_config(project_id=project_id)
188+
task_logger.info(f"Using pipeline config: {pipeline_config}")
189+
178190
prefiltered_images = list(images)
179191
images = list(filter_processed_images(images=prefiltered_images, pipeline=pipeline, task_logger=task_logger))
180192
if len(images) < len(prefiltered_images):
@@ -192,26 +204,39 @@ def process_images(
192204
task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
193205
urls = [source_image.public_url() for source_image in images if source_image.public_url()]
194206

195-
source_images = [
196-
SourceImageRequest(
197-
id=str(source_image.pk),
198-
url=url,
199-
)
200-
for source_image, url in zip(images, urls)
201-
if url
202-
]
207+
source_image_requests: list[SourceImageRequest] = []
208+
detection_requests: list[DetectionRequest] = []
209+
210+
reprocess_existing_detections = False
211+
# Check if feature flag is enabled to reprocess existing detections
212+
if project and project.feature_flags.reprocess_existing_detections:
213+
# Check if the user wants to reprocess existing detections or ignore them
214+
if pipeline_config.get("reprocess_existing_detections", True):
215+
reprocess_existing_detections = True
216+
217+
for source_image, url in zip(images, urls):
218+
if url:
219+
source_image_request = SourceImageRequest(
220+
id=str(source_image.pk),
221+
url=url,
222+
)
223+
source_image_requests.append(source_image_request)
203224

204-
if not project_id:
205-
task_logger.warning(f"Pipeline {pipeline} is not associated with a project")
225+
if reprocess_existing_detections:
226+
detection_requests += collect_detections(source_image, source_image_request)
206227

207-
config = pipeline.get_config(project_id=project_id)
208-
task_logger.info(f"Using pipeline config: {config}")
228+
if reprocess_existing_detections:
229+
task_logger.info(f"Found {len(detection_requests)} existing detections to reprocess.")
230+
else:
231+
task_logger.info("Reprocessing of existing detections is disabled, sending images without detections.")
209232

210233
request_data = PipelineRequest(
211234
pipeline=pipeline.slug,
212-
source_images=source_images,
213-
config=config,
235+
source_images=source_image_requests,
236+
config=pipeline_config,
237+
detections=detection_requests,
214238
)
239+
task_logger.debug(f"Pipeline request data: {request_data}")
215240

216241
session = create_session()
217242
resp = session.post(endpoint_url, json=request_data.dict())
@@ -230,7 +255,8 @@ def process_images(
230255
pipeline=pipeline.slug,
231256
total_time=0,
232257
source_images=[
233-
SourceImageResponse(id=source_image.id, url=source_image.url) for source_image in source_images
258+
SourceImageResponse(id=source_image_request.id, url=source_image_request.url)
259+
for source_image_request in source_image_requests
234260
],
235261
detections=[],
236262
errors=msg,
@@ -251,6 +277,33 @@ def process_images(
251277
return results
252278

253279

280+
def collect_detections(
281+
source_image: SourceImage,
282+
source_image_request: SourceImageRequest,
283+
) -> list[DetectionRequest]:
284+
"""
285+
Collect existing detections for a source image and send them with pipeline request.
286+
"""
287+
detection_requests: list[DetectionRequest] = []
288+
# Re-process all existing detections if they exist
289+
for detection in source_image.detections.all():
290+
bbox = detection.get_bbox()
291+
if bbox and detection.detection_algorithm:
292+
detection_requests.append(
293+
DetectionRequest(
294+
source_image=source_image_request,
295+
bbox=bbox,
296+
crop_image_url=detection.url(),
297+
algorithm=AlgorithmReference(
298+
name=detection.detection_algorithm.name,
299+
key=detection.detection_algorithm.key,
300+
),
301+
)
302+
)
303+
304+
return detection_requests
305+
306+
254307
def get_or_create_algorithm_and_category_map(
255308
algorithm_config: AlgorithmConfigResponse,
256309
logger: logging.Logger = logger,
@@ -351,23 +404,12 @@ def get_or_create_detection(
351404
serialized_bbox = list(detection_resp.bbox.dict().values())
352405
detection_repr = f"Detection {detection_resp.source_image_id} {serialized_bbox}"
353406

354-
assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}"
355-
try:
356-
detection_algo = algorithms_used[detection_resp.algorithm.key]
357-
except KeyError:
358-
raise ValueError(
359-
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. "
360-
"The processing service must declare it in the /info endpoint. "
361-
f"Known algorithms: {list(algorithms_used.keys())}"
362-
)
363-
364407
assert str(detection_resp.source_image_id) == str(
365408
source_image.pk
366409
), f"Detection belongs to a different source image: {detection_repr}"
367410

368411
existing_detection = Detection.objects.filter(
369412
source_image=source_image,
370-
detection_algorithm=detection_algo,
371413
bbox=serialized_bbox,
372414
).first()
373415

@@ -387,6 +429,16 @@ def get_or_create_detection(
387429
detection = existing_detection
388430

389431
else:
432+
assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}"
433+
try:
434+
detection_algo = algorithms_used[detection_resp.algorithm.key]
435+
except KeyError:
436+
raise ValueError(
437+
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. "
438+
"The processing service must declare it in the /info endpoint. "
439+
f"Known algorithms: {list(algorithms_used.keys())}"
440+
)
441+
390442
new_detection = Detection(
391443
source_image=source_image,
392444
bbox=serialized_bbox,
@@ -1007,7 +1059,7 @@ def collect_images(
10071059
)
10081060

10091061
def choose_processing_service_for_pipeline(
1010-
self, job_id: int, pipeline_name: str, project_id: int
1062+
self, job_id: int | None, pipeline_name: str, project_id: int
10111063
) -> ProcessingService:
10121064
# @TODO use the cached `last_checked_latency` and a max age to avoid checking every time
10131065

ami/ml/schemas.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,6 @@ class ClassificationResponse(pydantic.BaseModel):
112112
timestamp: datetime.datetime
113113

114114

115-
class DetectionResponse(pydantic.BaseModel):
116-
source_image_id: str
117-
bbox: BoundingBox
118-
inference_time: float | None = None
119-
algorithm: AlgorithmReference
120-
timestamp: datetime.datetime
121-
crop_image_url: str | None = None
122-
classifications: list[ClassificationResponse] = []
123-
124-
125115
class SourceImageRequest(pydantic.BaseModel):
126116
# @TODO bring over new SourceImage & b64 validation from the lepsAI repo
127117
id: str
@@ -144,6 +134,23 @@ class Config:
144134
]
145135

146136

137+
class DetectionRequest(pydantic.BaseModel):
138+
source_image: SourceImageRequest # the 'original' image
139+
bbox: BoundingBox
140+
crop_image_url: str | None = None
141+
algorithm: AlgorithmReference
142+
143+
144+
class DetectionResponse(pydantic.BaseModel):
145+
source_image_id: str
146+
bbox: BoundingBox
147+
inference_time: float | None = None
148+
algorithm: AlgorithmReference
149+
timestamp: datetime.datetime
150+
crop_image_url: str | None = None
151+
classifications: list[ClassificationResponse] = []
152+
153+
147154
class PipelineRequestConfigParameters(dict):
148155
"""Parameters used to configure a pipeline request.
149156
@@ -166,6 +173,7 @@ class PipelineRequestConfigParameters(dict):
166173
class PipelineRequest(pydantic.BaseModel):
167174
pipeline: str
168175
source_images: list[SourceImageRequest]
176+
detections: list[DetectionRequest] | None = None
169177
config: PipelineRequestConfigParameters | dict | None = None
170178

171179

0 commit comments

Comments
 (0)