import { Injectable } from '@angular/core';
import { CookieService } from 'ngx-cookie-service';
import { BehaviorSubject, merge, Observable, Subject, throwError, timer } from 'rxjs';
import { filter, first, map, retry, share, shareReplay, switchAll, switchMap, tap, throttleTime, timeout, withLatestFrom } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
import { hasKeys, pickTruthy } from 'Util';
import { AuthenticationService } from './authentication.service';
import { HttpService } from './http-service';
import { AudibleEvent, EventData, LiveUpdateEvent } from './live-update.service.types';

export * from './live-update.service.types';

function doesFilterMatch(prefix: string, path: string): boolean {
	const prefixParts: string[] = prefix.split('.');
	const pathParts: string[] = path.split('.');

	if (prefixParts.length > pathParts.length) {
		return false;
	}

	for (let i = 0; i < prefixParts.length; i++) {
		if (prefixParts[i] !== pathParts[i]) {
			return false;
		}
	}

	return true;
}

/**
 * This returns an exponential back-off with a random value added,
 * in case every user gets disconnected at the same time (like when
 * a back-end deployment causes a general disconnect).
 * The number returned will start at `min` milliseconds, and then increase exponentially by
 * the power of 2, with a random number of milliseconds below `min` added.
 * This will decrease the load on the server if everyone is trying to reconnect at once,
 * because their connections will be slightly staggered.
 * @remarks
 * Note: if issues are seen in the future, may want to increase the random number added
 * (change Math.random() * 1000 to Math.random() * 2000, for example).
 * This will max out at `max` milliseconds and stop incrementing the value.
 */
function getExponentalBackoff(min: number, max: number, failedCount: number) {
	return Math.min(Math.pow(2, failedCount) * min + Math.floor(Math.random() * min), max);
}

interface RawMessageData<D extends EventData | Event = EventData | Event> {
	action: string;
	detail?: string;
	data?: D;
	audible_event?: AudibleEvent;
}

@Injectable({
	providedIn: 'root',
})
export class LiveUpdateService {
	private open$ = new Subject();
	private close$ = new Subject();

	/**
	 * An observable that emits its current value on subscribe and whenever the connection state changes.
	 */
	isConnected$ = new BehaviorSubject(true);

	// controls how long to wait for a message from the server before considering the connection
	// dead. The server sends a heartbeat message every 10 seconds, with a write deadline of 10
	// seconds. In total the server could take up to 20 seconds to send a heartbeat message.
	private static MESSAGE_DELAY_GRACE_PERIOD = 40_000;

	private websocket$$ = this.authenticationService.server$.pipe(
		map((server) => {
			if (server) {
				return webSocket({
					url: server.ws_url,
					openObserver: this.open$,
					closeObserver: this.close$,
				});
			} else {
				return null;
			}
		}),
		shareReplay({ bufferSize: 1, refCount: true }),
		pickTruthy()
	);

	private forceReconnect$ = new BehaviorSubject<void>(undefined);

	private events$ = this.forceReconnect$.pipe(
		throttleTime(2000, undefined, { leading: true, trailing: true }),
		tap(() => {
			console.debug('connecting websocket');
		}),
		switchMap(() =>
			this.websocket$$.pipe(
				withLatestFrom(this.httpService.currentSchool$.pipe(pickTruthy())),
				// This ad-hoc cookie fetch is necessary to ensure that a cookie value is set,
				// which is usually the case whenever currentSchool emits.
				map((args) => [...args, this.cookieService.get('smartpassToken')] as const),
				tap(([websocket$, school, spCookie]) => {
					// buffer an authenticate message before any other messages
					websocket$.next({
						action: 'authenticate',
						token: spCookie,
						token_type: 'cookie_value',
						school_id: school.id,
					});
				}),
				map(([webSocket$]) => webSocket$)
			)
		),
		switchAll(),
		timeout({
			each: LiveUpdateService.MESSAGE_DELAY_GRACE_PERIOD,
			with: ({ seen }) =>
				throwError(
					() => new Error(`more than ${LiveUpdateService.MESSAGE_DELAY_GRACE_PERIOD}ms have elapsed since last message. ${seen} messages received`)
				),
		}),
		retry({
			delay: (error, retryCount) => {
				console.debug('websocket error', error);
				const duration = getExponentalBackoff(1_000, 30_000, retryCount);
				console.debug('restarting connection in ms', duration);
				return timer(duration);
			},
			resetOnSuccess: true,
		}),
		share({
			resetOnError: false, // should never happen given the above retry
			resetOnComplete: true,
			resetOnRefCountZero: false, // keep connection open regardless of number of subscribers
		})
	);

	private messages$ = this.events$.pipe(
		// TODO: better validation
		filter((event): event is RawMessageData<EventData> => hasKeys(event, ['action']))
	);

	constructor(private httpService: HttpService, private cookieService: CookieService, private authenticationService: AuthenticationService) {
		this.open$.subscribe((_) => {
			console.debug('websocket opened');
		});

		merge(this.open$.pipe(map(() => true)), this.close$.pipe(map(() => false))).subscribe(this.isConnected$);
	}

	forceReconnect() {
		this.forceReconnect$.next(undefined);
	}

	listen(filterString?: string): Observable<LiveUpdateEvent> {
		return this.messages$.pipe(filter((data) => !filterString || doesFilterMatch(filterString, data.action)));
	}

	listenOnCurrentSchool(filterString?: string): Observable<LiveUpdateEvent> {
		return this.listen(filterString).pipe(
			filter((e) => {
				const school = this.httpService.getSchool();
				return !school || !hasKeys(e?.data, ['school_id']) || e?.data?.school_id === school.id;
			})
		);
	}

	sendMessage(action: string, data?: string | null) {
		this.websocket$$.pipe(first()).subscribe((webSocket$) => {
			webSocket$.next({ action, data });
		});
	}

	restartOnConnected() {
		return <T>(source: Observable<T>): Observable<T> =>
			merge(
				source,
				this.isConnected$.pipe(
					filter(Boolean),
					switchMap(() => source)
				)
			);
	}
}
