Routing rule: All writes (create/update/delete) go through Auth Service. All reads go through Web Service. Geospatial Service is internal only.
Context
Currently, POST /geo/save returns 202 "Data accepted" immediately and publishes to Kafka. The consumer processes the message and silently skips duplicates (DuplicateKeyException). The client has no way to know if data was actually saved or skipped as a duplicate.
One youtubeContentID = one transaction with all chapters/places from a YouTube video. Each chapter maps to a different map location and each package is monetizable — the client needs to know the real outcome.
Solution: Job Status Tracking with SSE (Server-Sent Events)
Keep the async Kafka pattern. Use SSE to push the result to the client in real-time — no polling.
Routing Architecture
Writes (create/update/delete): iOS → Gateway (/auth/geo/**) → Auth Service → Geospatial Service
Reads: iOS → Gateway (/web/geo/**) → Web Service → Geospatial Service
Internal only: /geo/** not exposed through gateway
Flow
iOS → POST /auth/geo/save + JWT
Auth Service extracts email from JWT, injects ownerEmail into payload
Auth Service forwards to Geospatial Service
← 202 { trackingId: "uuid", status: "PENDING" }
iOS → GET /auth/geo/status/stream/{trackingId} + JWT (SSE connection opens)
← event: { trackingId: "uuid", status: "PENDING" }
...Kafka consumer finishes...
← event: { trackingId: "uuid", status: "COMPLETED", chaptersCount: 5, placesCount: 3 }
← connection closes
iOS receives COMPLETED/DUPLICATE/FAILED → updates UI immediately
How SSE Works in This System
The key component is SseEmitterManager — an in-memory registry that connects the Kafka consumer thread to the SSE connection thread.
Design Decisions
- trackingId = UUID (not youtubeContentID) — each submission is a distinct trackable job, retries don’t collide
- ownerEmail injected by Auth Service — extracted from JWT, not sent by iOS. Tamper-proof.
- Auth Service uses
JsonNode— no coupling toAdventureTubeDataentity. Justbody.put("ownerEmail", email)and forward. - Storage: same MongoDB database, new
jobStatuscollection - TTL: 7-day auto-cleanup via MongoDB TTL index on
createdAt - Kafka message envelope:
KafkaMessagewrapper carries bothtrackingId+AdventureTubeData - SSE on Tomcat: uses Spring’s
SseEmitter(works with Spring MVC / Tomcat, no WebFlux needed) - Auth Service SSE proxy: Auth Service subscribes to geospatial SSE stream via WebClient and relays events to iOS
- Fallback:
GET /geo/status/{trackingId}REST endpoint kept alongside SSE — for non-SSE clients and testing - Timeout: SseEmitter timeout set to 30 seconds — if consumer hasn’t processed by then, client can reconnect or use REST fallback
New Files — Geospatial Service (7)
1. JobStatusEnum
Path: geospatial-service/.../model/entity/adventuretube/JobStatusEnum.java
public enum JobStatusEnum { PENDING, COMPLETED, DUPLICATE, FAILED }
2. JobStatus (MongoDB document)
Path: geospatial-service/.../model/entity/adventuretube/JobStatus.java
| Field | Type | Notes |
|---|---|---|
id |
String | MongoDB auto-generated |
trackingId |
String | UUID, unique index |
youtubeContentID |
String | Business reference |
status |
JobStatusEnum | PENDING → COMPLETED/DUPLICATE/FAILED |
errorMessage |
String | Only on FAILED |
chaptersCount |
Integer | Only on COMPLETED |
placesCount |
Integer | Only on COMPLETED |
createdAt |
Instant | TTL index, 7-day expiry |
updatedAt |
Instant | Last status change |
3. JobStatusRepository
Path: geospatial-service/.../repository/JobStatusRepository.java
public interface JobStatusRepository extends MongoRepository<JobStatus, String> {
Optional<JobStatus> findByTrackingId(String trackingId);
}
4. JobStatusService
Path: geospatial-service/.../service/JobStatusService.java
Methods:
createPendingJob(youtubeContentID)→ creates PENDING job, returns JobStatus with trackingIdmarkCompleted(trackingId, chaptersCount, placesCount)→ updates + notifies SseEmitterManagermarkDuplicate(trackingId)→ updates + notifies SseEmitterManagermarkFailed(trackingId, errorMessage)→ updates + notifies SseEmitterManagerfindByTrackingId(trackingId)→ for REST fallback endpoint
5. KafkaMessage (envelope)
Path: geospatial-service/.../kafka/KafkaMessage.java
@Data @NoArgsConstructor @AllArgsConstructor
public class KafkaMessage {
private String trackingId;
private AdventureTubeData data;
}
6. SseEmitterManager (the bridge between Kafka consumer and SSE)
Path: geospatial-service/.../sse/SseEmitterManager.java
@Component
public class SseEmitterManager {
// trackingId → SseEmitter (waiting clients)
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter register(String trackingId, long timeout) {
SseEmitter emitter = new SseEmitter(timeout);
emitters.put(trackingId, emitter);
emitter.onCompletion(() -> emitters.remove(trackingId));
emitter.onTimeout(() -> emitters.remove(trackingId));
emitter.onError(e -> emitters.remove(trackingId));
return emitter;
}
public void send(String trackingId, JobStatus status) {
SseEmitter emitter = emitters.get(trackingId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("status")
.data(status));
emitter.complete(); // terminal status — close connection
} catch (IOException e) {
emitter.completeWithError(e);
}
}
}
}
This is the critical piece:
- Controller registers an SseEmitter when client opens SSE connection
- Consumer calls
send()after processing — pushes result and closes connection ConcurrentHashMapis thread-safe for producer/consumer threads- Auto-cleanup on completion/timeout/error
7. JobStatusDTO (optional — response wrapper)
Path: geospatial-service/.../model/dto/JobStatusDTO.java
If needed, a DTO to shape the SSE event data. Can skip if JobStatus entity is fine to expose directly.
Modified Files — Geospatial Service (4)
8. Producer.java
- Change signature:
sendAdventureTubeData(data)→sendAdventureTubeData(String trackingId, AdventureTubeData data) - Wrap in
KafkaMessageenvelope before serializing - Kafka key stays
youtubeContentID(for partitioning)
9. Consumer.java
- Deserialize as
KafkaMessageinstead of rawAdventureTubeData - Inject
JobStatusService - On success:
markCompleted(trackingId, chaptersCount, placesCount)→ triggers SSE push - On
DuplicateKeyException:markDuplicate(trackingId)→ triggers SSE push - On other Exception:
markFailed(trackingId, errorMessage)→ triggers SSE push - Fallback: if deserialization as KafkaMessage fails, try as raw AdventureTubeData (backward compat)
10. AdventureTubeDataController.java
save(): create PENDING job, pass trackingId to producer, returnServiceResponse<JobStatus>with 202- New SSE endpoint:
@GetMapping("/status/stream/{trackingId}")
public SseEmitter streamStatus(@PathVariable String trackingId) {
SseEmitter emitter = sseEmitterManager.register(trackingId, 30_000L);
// Send current status immediately (in case consumer already finished)
jobStatusService.findByTrackingId(trackingId).ifPresent(job -> {
if (job.getStatus() != JobStatusEnum.PENDING) {
try {
emitter.send(SseEmitter.event().name("status").data(job));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
}
});
return emitter;
}
- REST fallback endpoint:
GET /geo/status/{trackingId}→ returns job status or 404
11. JobStatusService.java — notify SseEmitterManager on status change
- Inject
SseEmitterManager - After updating MongoDB, call
sseEmitterManager.send(trackingId, updatedJob)to push to SSE client
Auth-Service Changes (SSE proxy + ownerEmail injection)
Auth Service is the external-facing proxy for all write operations. It uses JsonNode (not AdventureTubeData) to avoid coupling.
12. GeoWriteService.java (new)
Path: auth-service/.../service/GeoWriteService.java
@Service
@RequiredArgsConstructor
public class GeoWriteService {
private final ServiceClient serviceClient;
private final JwtUtil jwtUtil;
@Value("${geospatial-service.url}")
private String geoServiceUrl;
public Mono<ResponseEntity<String>> save(String authorization, JsonNode body) {
String token = authorization.replace("Bearer ", "");
String email = jwtUtil.extractUsername(token);
((ObjectNode) body).put("ownerEmail", email);
return serviceClient.postRawReactive(geoServiceUrl, "/geo/save", body, String.class)
.map(result -> ResponseEntity.accepted().body(result));
}
}
13. GeoWriteController.java (new)
Path: auth-service/.../controller/GeoWriteController.java
@RestController
@RequestMapping("/auth/geo")
@RequiredArgsConstructor
public class GeoWriteController {
private final GeoWriteService geoWriteService;
@PostMapping("/save")
public Mono<ResponseEntity<String>> save(
@RequestHeader("Authorization") String authorization,
@RequestBody JsonNode body) {
return geoWriteService.save(authorization, body);
}
@GetMapping(value = "/status/stream/{trackingId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamStatus(
@RequestHeader("Authorization") String authorization,
@PathVariable String trackingId) {
// Proxy SSE stream from geospatial-service
return serviceClient.getSseStreamReactive(geoServiceUrl,
"/geo/status/stream/" + trackingId, String.class);
}
@GetMapping("/status/{trackingId}")
public Mono<ResponseEntity<String>> getStatus(
@RequestHeader("Authorization") String authorization,
@PathVariable String trackingId) {
return serviceClient.getRawReactive(geoServiceUrl,
"/geo/status/" + trackingId, String.class)
.map(ResponseEntity::ok);
}
}
14. auth-service.yml (config-service)
Add:
geospatial-service:
url: lb://GEOSPATIAL-SERVICE
15. ServiceClient.java (common-api)
Add new methods needed by auth-service:
postRawReactive(baseUrl, path, body, responseType)→Mono<T>getSseStreamReactive(baseUrl, path, responseType)→Flux<T>(for SSE proxy)
Implementation Order
Phase 1: CLAUDE.md update
- Update
.claude/CLAUDE.md— document internal vs external service visibility, write/read routing
Phase 2: Geospatial Service (internal)
JobStatusEnum.java— standalone enumJobStatus.java— MongoDB documentJobStatusRepository.java— repositorySseEmitterManager.java— SSE bridge componentJobStatusService.java— service layer (uses SseEmitterManager)KafkaMessage.java— Kafka envelopeProducer.java— accept trackingId, serialize KafkaMessageConsumer.java— deserialize KafkaMessage, update job status (triggers SSE push)AdventureTubeDataController.java— create job, add SSE + REST endpoints
Phase 3: Auth Service (external proxy)
ServiceClient.java(common-api) — addpostRawReactive,getSseStreamReactiveGeoWriteService.java— save with ownerEmail injectionGeoWriteController.java— POST /auth/geo/save + SSE stream proxy + REST status proxyauth-service.yml— add geospatial-service URL config
Phase 4: Integration Test
- Add awaitility dependency to geospatial-service
pom.xml JobStatusFullFlowIT.java— full async integration test (real Kafka + MongoDB)- Update existing
AdventureTubeDataFullStackIT.java— update save test for new response format
Integration Test: Full Async Flow
Uses real Kafka and real MongoDB via @ActiveProfiles("integration") — no mocks, no Testcontainers.
Dependency (add to geospatial-service/pom.xml):
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
Test Cases
Test 1: New content → COMPLETED (via REST fallback)
@Test
void fullFlow_newContent_shouldComplete() {
// 1. POST /geo/save → 202 with trackingId
// 2. await().atMost(10, SECONDS).untilAsserted(() ->
// GET /geo/status/{trackingId} → status: COMPLETED
// )
// 3. Verify data exists in MongoDB via repository
// 4. Verify ownerEmail is set on the saved document
}
Test 2: Duplicate → DUPLICATE
@Test
void fullFlow_duplicate_shouldMarkDuplicate() {
// 1. Seed data directly in MongoDB (same youtubeContentID)
// 2. POST /geo/save with same youtubeContentID → 202 with trackingId
// 3. await() → GET /geo/status/{trackingId} → status: DUPLICATE
}
Test 3: SSE stream receives event
@Test
void sseStream_shouldReceiveCompletedEvent() {
// 1. POST /geo/save → get trackingId
// 2. Open SSE connection to /geo/status/stream/{trackingId}
// 3. Await event with status: COMPLETED
}
Test 4: Status endpoint returns 404 for unknown trackingId
@Test
void getStatus_unknownTrackingId_shouldReturn404() {
// GET /geo/status/nonexistent-uuid → 404
}
Verification
- Build:
mvn clean compile -pl geospatial-service,auth-service,common-api -am - Test new save (Postman — REST fallback):
POST /auth/geo/savewith JWT + content → 202 with trackingId + status: PENDINGGET /auth/geo/status/{trackingId}with JWT → status: COMPLETED
- Test SSE (curl or Postman):
curl -N -H "Authorization: Bearer {token}" http://gateway:8030/auth/geo/status/stream/{trackingId}→ receives SSE event with COMPLETED
- Test duplicate:
POST /auth/geo/savewith same youtubeContentID → 202 with new trackingId- SSE or REST → status: DUPLICATE
- Verify ownerEmail:
- Check MongoDB document has
ownerEmailfield set from JWT
- Check MongoDB document has
- Reads still work (no auth):
GET /web/geo/data→ returns all data (through web-service, no JWT needed)
- Zipkin: verify trace spans across gateway → auth-service → geospatial-service → Kafka
Breaking Change: The save endpoint moves from POST /web/geo/save to POST /auth/geo/save (requires JWT). The iOS client will need to:
- Send JWT with save requests
- Parse the new JSON response to extract
trackingId - Open SSE connection:
GET /auth/geo/status/stream/{trackingId}(with JWT) - Listen for status event → handle COMPLETED / DUPLICATE / FAILED in UI
