import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { filter, finalize, map } from 'rxjs/operators';
import io from 'socket.io-client';
import { API_URL } from '../../lib-core/constants/constants';

@Injectable({
  providedIn: 'root'
})
export class RealtimeDatabaseService {
  private rtdbData$: Subject<{ key: string; data: any }> = new Subject<{ key: string; data: any }>();
  private observablesMap: { [key: string]: Observable<any> } = {};

  private socket: any;

  private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);

  public get $connected(): Observable<boolean> {
    return this.connectedSubject.asObservable();
  }

  constructor(
  ) {
    // this.init(null);
  }

  public async init(token: string | null): Promise<any> {
    this.socket = await io(API_URL, {
      path: `/rtdb`,
      autoConnect: true,
      transports: ['websocket'],
      ...token && { auth: {
        token: `Bearer ${token}`
      }}
    });
    this.socket.on('connect', () => {
      this.connectedSubject.next(true);
      // re-subscribe to events if reconnected
      for (const key of Object.keys(this.observablesMap)) {
        this.socket.emit('subscribe', key);
      }
    });
    this.socket.on('connect_error', (err: any) => {
      console.warn('RTDB connection error: ', err);
      this.connectedSubject.error(err);
    });
    this.socket.on('disconnect', (err: any) => {
      this.connectedSubject.next(false);
    });
    this.socket.on('data', (data: { key: string; data: any }) => {
      this.rtdbData$.next(data);
    });
    return this.socket;
  }

  public async get<T>(key: string): Promise<T> {
    return this.promisifyEmit<T>('get', key);
  }

  public subscribe<T>(key: string): Observable<T> {
    if (!this.observablesMap[key]) {
      this.observablesMap[key] = this.rtdbData$.asObservable()
        .pipe(
          finalize(() => {
            this.socket.emit('unsubscribe', key);
            delete this.observablesMap[key];
          }),
          filter(x => x.key === key),
          map(({ data }) => data && data as T)
        );
      this.socket.emit('subscribe', key);
    }
    return this.observablesMap[key];
  }

  public async set<T>(key: string, value: T): Promise<T> {
    return this.promisifyEmit('set', key, value);
  }

  public async update<T>(key: string, value: Partial<T>): Promise<T> {
    return this.promisifyEmit<T>('update', key, value);
  }

  public async delete(key: string): Promise<void> {
    await this.promisifyEmit('del', key);
  }

  public addOnDisconnectHook<T>(key: string, value: T | null = null): void {
    this.socket.emit('add-on-disconnect-hook', { key, value });
  }

  public removeOnDisconnectHook<T>(key: string): void {
    this.socket.emit('remove-on-disconnect-hook', key);
  }

  private promisifyEmit<T>(event: string, ...args: any[]): Promise<T> {
    return new Promise((resolve) => {
      this.socket.emit(event, ...args, (res: {key: string; data: T})  => {
        resolve(res.data);
      });
    });
  }

}
