import { END, eventChannel } from 'redux-saga';
import { call, cancelled, put, take, takeLatest } from 'redux-saga/effects';
import * as RealtimeActionTypes from './RealtimeActionTypes';
import RealtimeEventTypes from './RealtimeEventTypes';
import { getWebsocketClient } from './sockets/websocket';

let channel;

function realtimeChannel() {
  let websocketClient;
  return eventChannel(emitter => {
    try {
      websocketClient = getWebsocketClient({
        onMessageFn: (topic, payload) => {
          emitter({
            type: RealtimeEventTypes.message,
            payload,
          });
        },
        onConnectFn: () => {
          emitter({
            type: RealtimeEventTypes.connection,
          });
        },
      });
    } catch (e) {
      emitter(END);
    }
    return async () => {
      await websocketClient.disconnect();
    };
  });
}

export function* connectToRealtimeService() {
  channel = yield call(realtimeChannel);
  try {
    while (true) {
      const message = yield take(channel);
      if (message.type === RealtimeEventTypes.connection) {
        yield put({
          type: RealtimeActionTypes.REALTIME_CONNECTED,
        });
      }
      if (message.type === RealtimeEventTypes.message) {
        yield put({
          type: RealtimeActionTypes.REALTIME_DIGEST,
          data: JSON.parse(message.payload.toString()),
        });
      }
    }
  } finally {
    if (yield cancelled()) {
      channel.close();
    }
  }
}

export function* disconnectRealtimeService() {
  if (channel) {
    yield channel.close();
  }
}

export default function* RealtimeApplySaga() {
  yield takeLatest(
    RealtimeActionTypes.REALTIME_CONNECT,
    connectToRealtimeService,
  );
  yield takeLatest(
    RealtimeActionTypes.REALTIME_DISCONNECT,
    disconnectRealtimeService,
  );
}
