import { Injectable } from '@angular/core';
import {
  NotificationControls,
  SocketRequest,
  SocketResponse,
} from 'generated/src/main/proto/shared/notification-shared.pb';
import {
  EMPTY,
  Observable,
  Subject,
  catchError,
  lastValueFrom,
  tap,
  throwError,
  timer,
} from 'rxjs';

import {
  GetControlsRequest,
  GetControlsResponse,
  GetRequest,
  GetResponse,
  SetSeenRequest,
  SetSeenResponse,
  UpdateControlsRequest,
  UpdateControlsResponse,
} from 'generated/src/main/proto/api/notification-service.pb';
import { NotificationServiceClient } from 'generated/src/main/proto/api/notification-service.pbsc';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
import { AppConfigProvider } from 'src/environments/app-config';
import { getSessionId } from '../api.auth';
import { BannerMessage, BannerService } from '../banner/banner.service';
import { LoggedInUser, UserService } from '../user/user.service';

/** Listens to notifications from the websocket server. */
@Injectable({
  providedIn: 'root',
})
export class NotificationsService {
  private socketSubject: WebSocketSubject<object> | undefined;
  private readonly responseSubject = new Subject<SocketResponse>();

  constructor(
    private userService: UserService,
    private bannerService: BannerService,
    private notificationServiceClient: NotificationServiceClient,
  ) {
    userService.user.subscribe((user) => this.onUserChanged(user));
    document.addEventListener('visibilitychange', () =>
      this.onDocumentVisibilityChanged(document.hidden),
    );
  }

  /** Connects to the websocket server. */
  startListening(): void {
    if (!this.socketSubject) {
      this.connect();
    }
  }

  /** Notifications observable. */
  get responseObservable(): Observable<SocketResponse> {
    return this.responseSubject;
  }

  /** Reads all the notifications for the user. */
  async get(): Promise<GetResponse> {
    return lastValueFrom(
      this.notificationServiceClient
        .get(new GetRequest(), this.userService.userTokenMetadata)
        .pipe(
          catchError((e) => {
            this.bannerService.add(new BannerMessage(e.statusMessage));
            return throwError(() => e);
          }),
        ),
    );
  }

  /** Marks notifications as read. */
  async setSeen(notificationIds: string[]): Promise<SetSeenResponse> {
    return lastValueFrom(
      this.notificationServiceClient
        .setSeen(
          new SetSeenRequest({ notificationIds: notificationIds }),
          this.userService.userTokenMetadata,
        )
        .pipe(
          catchError((e) => {
            this.bannerService.add(new BannerMessage(e.statusMessage));
            return throwError(() => e);
          }),
        ),
    );
  }

  /** Reads notification controls. */
  async getControls(): Promise<GetControlsResponse> {
    return lastValueFrom(
      this.notificationServiceClient
        .getControls(
          new GetControlsRequest(),
          this.userService.userTokenMetadata,
        )
        .pipe(
          catchError((e) => {
            this.bannerService.add(new BannerMessage(e.statusMessage));
            return throwError(() => e);
          }),
        ),
    );
  }

  /** Reads notification controls. */
  async updateControls(
    controls: NotificationControls,
  ): Promise<UpdateControlsResponse> {
    return lastValueFrom(
      this.notificationServiceClient
        .updateControls(
          new UpdateControlsRequest({ controls: controls }),
          this.userService.userTokenMetadata,
        )
        .pipe(
          catchError((e) => {
            this.bannerService.add(new BannerMessage(e.statusMessage));
            return throwError(() => e);
          }),
        ),
    );
  }

  /** Reconnects to the websocket server. */
  private onUserChanged(user: LoggedInUser | undefined): void {
    this.disconnect();

    if (user && user.email) {
      this.connect();
    }
  }

  /** If connection was lost while document was hidden, restore it. */
  private onDocumentVisibilityChanged(isHidden: boolean): void {
    if (!isHidden) {
      this.reconnectIfNotConnected();
    }
  }

  private reconnectIfNotConnected(): void {
    const reconnect = !this.socketSubject || this.socketSubject.closed;
    console.log('Attempting to reconnect to websocket: ' + reconnect);
    if (reconnect) {
      this.connect();
    }
  }

  /** Connects to the websocket server. */
  private connect(): void {
    // Close the socket if it was open.
    this.disconnect();

    const sessionId = getSessionId();
    if (!sessionId) {
      console.error('Session id is null');
      return;
    }

    // Create socket client and subscribe to responses.
    const wssUrl =
      AppConfigProvider.config.apiServerHost.replace('https://', 'wss://') +
      '/websocket/notifications';
    this.socketSubject = webSocket<object>({
      url: wssUrl,
      closeObserver: {
        next: () => {
          console.log('Websocket closed');
          this.disconnect();
          timer(5000).subscribe(() => this.reconnectIfNotConnected());
        },
      },
    });
    this.socketSubject
      .pipe(
        tap({
          error: (error) => console.error(error),
        }),
        catchError(() => EMPTY),
      )
      .subscribe((jsonObject) => {
        const response = new SocketResponse(jsonObject);
        if (response.connect) {
          console.log('Connected to websocket');
        } else {
          console.log('Response from websocket: ' + JSON.stringify(response));

          this.responseSubject.next(response);
        }
      });

    // Write connect message to the socket.
    this.socketSubject.next(
      new SocketRequest({
        sessionId: sessionId,
        connect: {},
      }).toJSON(),
    );
  }

  private disconnect(): void {
    if (this.socketSubject) {
      if (!this.socketSubject.closed) {
        this.socketSubject.complete();
      }
      this.socketSubject = undefined;
    }
  }
}
