import { Injectable, inject } from '@angular/core';
import { environment } from '@environment';
import { UntilDestroy, untilDestroyed } from '@ngneat/until-destroy';
import { Store } from '@ngxs/store';
import { CallStatusEnum } from '@shared/models/voip/call-status.enum';
import { SseService } from '@shared/services/sse.service';
import { UserState } from '@store/user/user.state';
import { AddNewCall, UpdateCall } from '@store/voip/voip.action';
import { Observable, filter, map, switchMap } from 'rxjs';

@UntilDestroy()
@Injectable({
  providedIn: 'root',
})
export class VoipSseService {
  readonly #store = inject(Store);
  readonly #sseService = inject(SseService);

  subscribeToVoipEvents(): void {
    const userProfileId$ = this.#store.select(UserState.userProfile).pipe(
      map((userProfile) => userProfile?.id),
      filter((userProfileId) => !!userProfileId)
    );

    this.subscribeToNewCalls(userProfileId$);
    this.subscribeToCallStarted(userProfileId$);
    this.subscribeToCallEnded(userProfileId$);
  }

  subscribeToNewCalls(userProfileId$: Observable<string>): void {
    this.getVoipUpdatesFromMercure(userProfileId$, 'new-call')
      .pipe(
        untilDestroyed(this),
        filter((event) => !!event?.data),
        map((event) => JSON.parse(event.data)),
        map((data) => ({
          callHandle: data.callHandle,
          callerName: data.candidateName,
          callStatus: data.callDirection,
          callStart: undefined,
          eventId: data.eventId,
          candidateId: data.candidateId,
        }))
      )
      .subscribe({
        next: (newCall) => {
          this.#store.dispatch(new AddNewCall(newCall));
        },
      });
  }

  subscribeToCallStarted(userProfileId$: Observable<string>): void {
    this.getVoipUpdatesFromMercure(userProfileId$, 'call-started')
      .pipe(
        untilDestroyed(this),
        map((event) => JSON.parse(event.data)),
        filter((data) => !!data)
      )
      .subscribe({
        next: (callStarted) => {
          this.#store.dispatch(
            new UpdateCall(callStarted.callHandle, CallStatusEnum.Current, new Date(callStarted.startedAt * 1000))
          );
        },
      });
  }

  subscribeToCallEnded(userProfileId$: Observable<string>): void {
    this.getVoipUpdatesFromMercure(userProfileId$, 'call-ended')
      .pipe(map((event) => JSON.parse(event.data)?.callHandle))
      .subscribe({
        next: (callHandle) => {
          this.#store.dispatch(new UpdateCall(callHandle, CallStatusEnum.Ended, undefined));
        },
      });
  }

  getVoipUpdatesFromMercure(userProfileId$: Observable<string>, topic: string): Observable<any> {
    return userProfileId$.pipe(
      switchMap((userProfileId) =>
        this.#sseService.getServerSentEvent(
          `${environment.api.mercure}?topic=http://sourcii/voip/${userProfileId}/${topic}`
        )
      )
    );
  }
}
