import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, of, Subject, timer } from 'rxjs';
import { catchError, filter, map, switchMap, takeUntil, tap } from 'rxjs/operators';
import { AppConfig } from '../app.config';
import { BACKGROUND_NETWORK_REQ_OPTIONS, SESSION_UPDATES_POLL_INTERVAL } from '../constants';
import { HevoEntity } from '../models/hevo-entity';
import {
  getAffectedSessions,
  MockSessionUpdatesObservable,
  SessionUpdate,
  SessionUpdateEntity,
  SessionUpdatesFactory
} from '../models/session-update';
import { RxRequestService } from './rx-request.service';


@Injectable()
export class SessionDataUpdateService {
  private _sessionUpdates$ = new BehaviorSubject<SessionUpdate[]>([]);
  private _needSessionUpdates$ = new BehaviorSubject<void>(null);
  private _destroyed$ = new Subject<void>();

  private _mockSessionUpdatesEffect$ = MockSessionUpdatesObservable().pipe(
    tap((data) => {
      this._sessionUpdates$.next(data);
    })
  );

  private _sessionUpdatesEffect$ = this._needSessionUpdates$.pipe(
    switchMap(() => {
      return timer(0, SESSION_UPDATES_POLL_INTERVAL).pipe(
        switchMap(() => {
          return this._getSessionUpdates().pipe(
            tap((data) => {
              this._sessionUpdates$.next(data);
            }),
            catchError(() => of(null))
          );
        })
      );
    })
  );

  private _userId: number;

  public sessionUpdates$ = this._sessionUpdates$.asObservable();

  constructor(
    private _requestService: RxRequestService,
    private _appConfig: AppConfig
  ) {
  }

  connect() {
    this._sessionUpdatesEffect$.pipe(takeUntil(this._destroyed$)).subscribe();
  }

  /**
   * For the queried entity return observable which fires whenever any update was made.
   *
   * @param entity
   * @param entityId
   * @param secondaryEntity
   */
  onProgressChange(entity?: HevoEntity, entityId?: number, secondaryEntity?: SessionUpdateEntity): Observable<boolean> {
    let lastUpdatedTs = new Date().getTime();
    return this._sessionUpdates$.pipe(
      map((updates) => {
        return getAffectedSessions(updates, entity, entityId, secondaryEntity).reduce((latest, curr) => {
          return curr.updatedAt > latest ? curr.updatedAt : latest;
        }, -1);
      }),
      filter((latestUpdateTs) => {
        return latestUpdateTs > lastUpdatedTs;
      }),
      map((latestUpdateTs) => {
        lastUpdatedTs = latestUpdateTs;
        return true;
      })
    );
  }

  sessionUpdates(entity?: HevoEntity, entityId?: number, secondaryEntity?: SessionUpdateEntity) {
    return this._sessionUpdates$.pipe(
      map((updates) => {
        return getAffectedSessions(updates, entity, entityId, secondaryEntity);
      })
    );
  }

  refresh() {
    this._needSessionUpdates$.next();
  }

  disconnect() {
    this._sessionUpdates$.next([]);
    this._destroyed$.next();
  }

  private _getSessionUpdates(): Observable<SessionUpdate[]> {
    const requestUrl = this._appConfig.getIntegrationsURL() + '/session/users';

    return this._requestService.get(requestUrl, BACKGROUND_NETWORK_REQ_OPTIONS).pipe(
      map((res: any) => SessionUpdatesFactory(res.data.session_data))
    );
  }
}
