Kafka Job Status Tracking for POST /geo/save (SSE) — Implementation Plan

Routing rule: All writes (create/update/delete) go through Auth Service. All reads go through Web Service. Geospatial Service is internal only.

Context

Currently, POST /geo/save returns 202 "Data accepted" immediately and publishes to Kafka. The consumer processes the message and silently skips duplicates (DuplicateKeyException). The client has no way to know if data was actually saved or skipped as a duplicate.

One youtubeContentID = one transaction with all chapters/places from a YouTube video. Each chapter maps to a different map location and each package is monetizable — the client needs to know the real outcome.

Solution: Job Status Tracking with SSE (Server-Sent Events)

Keep the async Kafka pattern. Use SSE to push the result to the client in real-time — no polling.

Routing Architecture

Writes (create/update/delete):  iOS → Gateway (/auth/geo/**) → Auth Service → Geospatial Service
Reads:                          iOS → Gateway (/web/geo/**)  → Web Service  → Geospatial Service
Internal only:                  /geo/** not exposed through gateway

Flow

iOS → POST /auth/geo/save + JWT
    Auth Service extracts email from JWT, injects ownerEmail into payload
    Auth Service forwards to Geospatial Service
    ← 202 { trackingId: "uuid", status: "PENDING" }

iOS → GET /auth/geo/status/stream/{trackingId} + JWT     (SSE connection opens)
    ← event: { trackingId: "uuid", status: "PENDING" }
    ...Kafka consumer finishes...
    ← event: { trackingId: "uuid", status: "COMPLETED", chaptersCount: 5, placesCount: 3 }
    ← connection closes

iOS receives COMPLETED/DUPLICATE/FAILED → updates UI immediately

How SSE Works in This System

The key component is SseEmitterManager — an in-memory registry that connects the Kafka consumer thread to the SSE connection thread.

Design Decisions

  • trackingId = UUID (not youtubeContentID) — each submission is a distinct trackable job, retries don’t collide
  • ownerEmail injected by Auth Service — extracted from JWT, not sent by iOS. Tamper-proof.
  • Auth Service uses JsonNode — no coupling to AdventureTubeData entity. Just body.put("ownerEmail", email) and forward.
  • Storage: same MongoDB database, new jobStatus collection
  • TTL: 7-day auto-cleanup via MongoDB TTL index on createdAt
  • Kafka message envelope: KafkaMessage wrapper carries both trackingId + AdventureTubeData
  • SSE on Tomcat: uses Spring’s SseEmitter (works with Spring MVC / Tomcat, no WebFlux needed)
  • Auth Service SSE proxy: Auth Service subscribes to geospatial SSE stream via WebClient and relays events to iOS
  • Fallback: GET /geo/status/{trackingId} REST endpoint kept alongside SSE — for non-SSE clients and testing
  • Timeout: SseEmitter timeout set to 30 seconds — if consumer hasn’t processed by then, client can reconnect or use REST fallback

New Files — Geospatial Service (7)

1. JobStatusEnum

Path: geospatial-service/.../model/entity/adventuretube/JobStatusEnum.java

public enum JobStatusEnum { PENDING, COMPLETED, DUPLICATE, FAILED }

2. JobStatus (MongoDB document)

Path: geospatial-service/.../model/entity/adventuretube/JobStatus.java

Field Type Notes
id String MongoDB auto-generated
trackingId String UUID, unique index
youtubeContentID String Business reference
status JobStatusEnum PENDING → COMPLETED/DUPLICATE/FAILED
errorMessage String Only on FAILED
chaptersCount Integer Only on COMPLETED
placesCount Integer Only on COMPLETED
createdAt Instant TTL index, 7-day expiry
updatedAt Instant Last status change

3. JobStatusRepository

Path: geospatial-service/.../repository/JobStatusRepository.java

public interface JobStatusRepository extends MongoRepository<JobStatus, String> {
    Optional<JobStatus> findByTrackingId(String trackingId);
}

4. JobStatusService

Path: geospatial-service/.../service/JobStatusService.java

Methods:

  • createPendingJob(youtubeContentID) → creates PENDING job, returns JobStatus with trackingId
  • markCompleted(trackingId, chaptersCount, placesCount) → updates + notifies SseEmitterManager
  • markDuplicate(trackingId) → updates + notifies SseEmitterManager
  • markFailed(trackingId, errorMessage) → updates + notifies SseEmitterManager
  • findByTrackingId(trackingId) → for REST fallback endpoint

5. KafkaMessage (envelope)

Path: geospatial-service/.../kafka/KafkaMessage.java

@Data @NoArgsConstructor @AllArgsConstructor
public class KafkaMessage {
    private String trackingId;
    private AdventureTubeData data;
}

6. SseEmitterManager (the bridge between Kafka consumer and SSE)

Path: geospatial-service/.../sse/SseEmitterManager.java

@Component
public class SseEmitterManager {

    // trackingId → SseEmitter (waiting clients)
    private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    public SseEmitter register(String trackingId, long timeout) {
        SseEmitter emitter = new SseEmitter(timeout);
        emitters.put(trackingId, emitter);
        emitter.onCompletion(() -> emitters.remove(trackingId));
        emitter.onTimeout(() -> emitters.remove(trackingId));
        emitter.onError(e -> emitters.remove(trackingId));
        return emitter;
    }

    public void send(String trackingId, JobStatus status) {
        SseEmitter emitter = emitters.get(trackingId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                    .name("status")
                    .data(status));
                emitter.complete();  // terminal status — close connection
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    }
}

This is the critical piece:

  • Controller registers an SseEmitter when client opens SSE connection
  • Consumer calls send() after processing — pushes result and closes connection
  • ConcurrentHashMap is thread-safe for producer/consumer threads
  • Auto-cleanup on completion/timeout/error

7. JobStatusDTO (optional — response wrapper)

Path: geospatial-service/.../model/dto/JobStatusDTO.java

If needed, a DTO to shape the SSE event data. Can skip if JobStatus entity is fine to expose directly.


Modified Files — Geospatial Service (4)

8. Producer.java

  • Change signature: sendAdventureTubeData(data)sendAdventureTubeData(String trackingId, AdventureTubeData data)
  • Wrap in KafkaMessage envelope before serializing
  • Kafka key stays youtubeContentID (for partitioning)

9. Consumer.java

  • Deserialize as KafkaMessage instead of raw AdventureTubeData
  • Inject JobStatusService
  • On success: markCompleted(trackingId, chaptersCount, placesCount) → triggers SSE push
  • On DuplicateKeyException: markDuplicate(trackingId) → triggers SSE push
  • On other Exception: markFailed(trackingId, errorMessage) → triggers SSE push
  • Fallback: if deserialization as KafkaMessage fails, try as raw AdventureTubeData (backward compat)

10. AdventureTubeDataController.java

  • save(): create PENDING job, pass trackingId to producer, return ServiceResponse<JobStatus> with 202
  • New SSE endpoint:
@GetMapping("/status/stream/{trackingId}")
public SseEmitter streamStatus(@PathVariable String trackingId) {
    SseEmitter emitter = sseEmitterManager.register(trackingId, 30_000L);
    // Send current status immediately (in case consumer already finished)
    jobStatusService.findByTrackingId(trackingId).ifPresent(job -> {
        if (job.getStatus() != JobStatusEnum.PENDING) {
            try {
                emitter.send(SseEmitter.event().name("status").data(job));
                emitter.complete();
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    });
    return emitter;
}
  • REST fallback endpoint: GET /geo/status/{trackingId} → returns job status or 404

11. JobStatusService.java — notify SseEmitterManager on status change

  • Inject SseEmitterManager
  • After updating MongoDB, call sseEmitterManager.send(trackingId, updatedJob) to push to SSE client

Auth-Service Changes (SSE proxy + ownerEmail injection)

Auth Service is the external-facing proxy for all write operations. It uses JsonNode (not AdventureTubeData) to avoid coupling.

12. GeoWriteService.java (new)

Path: auth-service/.../service/GeoWriteService.java

@Service
@RequiredArgsConstructor
public class GeoWriteService {
    private final ServiceClient serviceClient;
    private final JwtUtil jwtUtil;

    @Value("${geospatial-service.url}")
    private String geoServiceUrl;

    public Mono<ResponseEntity<String>> save(String authorization, JsonNode body) {
        String token = authorization.replace("Bearer ", "");
        String email = jwtUtil.extractUsername(token);
        ((ObjectNode) body).put("ownerEmail", email);
        return serviceClient.postRawReactive(geoServiceUrl, "/geo/save", body, String.class)
                .map(result -> ResponseEntity.accepted().body(result));
    }
}

13. GeoWriteController.java (new)

Path: auth-service/.../controller/GeoWriteController.java

@RestController
@RequestMapping("/auth/geo")
@RequiredArgsConstructor
public class GeoWriteController {
    private final GeoWriteService geoWriteService;

    @PostMapping("/save")
    public Mono<ResponseEntity<String>> save(
            @RequestHeader("Authorization") String authorization,
            @RequestBody JsonNode body) {
        return geoWriteService.save(authorization, body);
    }

    @GetMapping(value = "/status/stream/{trackingId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamStatus(
            @RequestHeader("Authorization") String authorization,
            @PathVariable String trackingId) {
        // Proxy SSE stream from geospatial-service
        return serviceClient.getSseStreamReactive(geoServiceUrl,
                "/geo/status/stream/" + trackingId, String.class);
    }

    @GetMapping("/status/{trackingId}")
    public Mono<ResponseEntity<String>> getStatus(
            @RequestHeader("Authorization") String authorization,
            @PathVariable String trackingId) {
        return serviceClient.getRawReactive(geoServiceUrl,
                "/geo/status/" + trackingId, String.class)
                .map(ResponseEntity::ok);
    }
}

14. auth-service.yml (config-service)

Add:

geospatial-service:
  url: lb://GEOSPATIAL-SERVICE

15. ServiceClient.java (common-api)

Add new methods needed by auth-service:

  • postRawReactive(baseUrl, path, body, responseType)Mono<T>
  • getSseStreamReactive(baseUrl, path, responseType)Flux<T> (for SSE proxy)

Implementation Order

Phase 1: CLAUDE.md update

  1. Update .claude/CLAUDE.md — document internal vs external service visibility, write/read routing

Phase 2: Geospatial Service (internal)

  1. JobStatusEnum.java — standalone enum
  2. JobStatus.java — MongoDB document
  3. JobStatusRepository.java — repository
  4. SseEmitterManager.java — SSE bridge component
  5. JobStatusService.java — service layer (uses SseEmitterManager)
  6. KafkaMessage.java — Kafka envelope
  7. Producer.java — accept trackingId, serialize KafkaMessage
  8. Consumer.java — deserialize KafkaMessage, update job status (triggers SSE push)
  9. AdventureTubeDataController.java — create job, add SSE + REST endpoints

Phase 3: Auth Service (external proxy)

  1. ServiceClient.java (common-api) — add postRawReactive, getSseStreamReactive
  2. GeoWriteService.java — save with ownerEmail injection
  3. GeoWriteController.java — POST /auth/geo/save + SSE stream proxy + REST status proxy
  4. auth-service.yml — add geospatial-service URL config

Phase 4: Integration Test

  1. Add awaitility dependency to geospatial-service pom.xml
  2. JobStatusFullFlowIT.java — full async integration test (real Kafka + MongoDB)
  3. Update existing AdventureTubeDataFullStackIT.java — update save test for new response format

Integration Test: Full Async Flow

Uses real Kafka and real MongoDB via @ActiveProfiles("integration") — no mocks, no Testcontainers.

Dependency (add to geospatial-service/pom.xml):

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

Test Cases

Test 1: New content → COMPLETED (via REST fallback)

@Test
void fullFlow_newContent_shouldComplete() {
    // 1. POST /geo/save → 202 with trackingId
    // 2. await().atMost(10, SECONDS).untilAsserted(() ->
    //        GET /geo/status/{trackingId} → status: COMPLETED
    //    )
    // 3. Verify data exists in MongoDB via repository
    // 4. Verify ownerEmail is set on the saved document
}

Test 2: Duplicate → DUPLICATE

@Test
void fullFlow_duplicate_shouldMarkDuplicate() {
    // 1. Seed data directly in MongoDB (same youtubeContentID)
    // 2. POST /geo/save with same youtubeContentID → 202 with trackingId
    // 3. await() → GET /geo/status/{trackingId} → status: DUPLICATE
}

Test 3: SSE stream receives event

@Test
void sseStream_shouldReceiveCompletedEvent() {
    // 1. POST /geo/save → get trackingId
    // 2. Open SSE connection to /geo/status/stream/{trackingId}
    // 3. Await event with status: COMPLETED
}

Test 4: Status endpoint returns 404 for unknown trackingId

@Test
void getStatus_unknownTrackingId_shouldReturn404() {
    // GET /geo/status/nonexistent-uuid → 404
}

Verification

  1. Build: mvn clean compile -pl geospatial-service,auth-service,common-api -am
  2. Test new save (Postman — REST fallback):
    • POST /auth/geo/save with JWT + content → 202 with trackingId + status: PENDING
    • GET /auth/geo/status/{trackingId} with JWT → status: COMPLETED
  3. Test SSE (curl or Postman):
    • curl -N -H "Authorization: Bearer {token}" http://gateway:8030/auth/geo/status/stream/{trackingId} → receives SSE event with COMPLETED
  4. Test duplicate:
    • POST /auth/geo/save with same youtubeContentID → 202 with new trackingId
    • SSE or REST → status: DUPLICATE
  5. Verify ownerEmail:
    • Check MongoDB document has ownerEmail field set from JWT
  6. Reads still work (no auth):
    • GET /web/geo/data → returns all data (through web-service, no JWT needed)
  7. Zipkin: verify trace spans across gateway → auth-service → geospatial-service → Kafka

Breaking Change: The save endpoint moves from POST /web/geo/save to POST /auth/geo/save (requires JWT). The iOS client will need to:

  1. Send JWT with save requests
  2. Parse the new JSON response to extract trackingId
  3. Open SSE connection: GET /auth/geo/status/stream/{trackingId} (with JWT)
  4. Listen for status event → handle COMPLETED / DUPLICATE / FAILED in UI

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top