import {
  catchError,
  distinctUntilChanged,
  distinctUntilKeyChanged,
  filter,
  forkJoin,
  from,
  map,
  merge,
  Observable,
  of,
  switchMap,
  withLatestFrom,
} from "rxjs";
import { Client, DsError } from "ds";
import { flow, identity } from "fp-ts/function";
import { getDataType } from "ds/DataTypes";
import { createRepository, getRepositories } from "ds/Repositories";
import * as E from "fp-ts/Either";
import { DataType } from "types/src/DataType/DataType";
import { sequenceT } from "fp-ts/Apply";
import { extractFieldsFromSchema } from "../../../../../../../../generic-states/SchemaFields/utils";
import { Epic } from "../../../../../../../../types/RootEpic";
import { dsErrorNotification } from "../../../../../../../Notifications/epic";
import * as State from "./types/State";
import * as Actions from "./types/Actions";
import { parentSearchState, schemaFieldsState } from "./utils";

export const epic: Epic<
  Actions.Actions,
  State.State,
  { pyckAdminClient$: Observable<Client> }
> = (state$, { pyckAdminClient$ }) => {
  const fieldsSchema$ = schemaFieldsState.epic(
    state$.pipe(
      filter(State.isLoaded),
      map((s) => s.payload.schema),
    ),
    pyckAdminClient$,
  );

  const loading$: Observable<Actions.Actions> = state$.pipe(
    filter(State.isLoading),
    map((s) => s.payload.dataTypeId),
    distinctUntilChanged(),
    withLatestFrom(pyckAdminClient$),
    switchMap(([dataTypeId, client]) => {
      return forkJoin({
        dataTypes: from(getDataType(client, dataTypeId)).pipe(
          map(E.mapLeft<DsError, Actions.Actions>(Actions.loadFail)),
          map(E.chain(E.fromNullable<Actions.Actions>(Actions.loadFail()))),
          map(
            E.filterOrElse<DataType, Actions.Actions>(
              (v) => v.entity === "repository",
              Actions.loadFail,
            ),
          ),
          map(E.map((v) => v.schema)),
        ),
        repositories: from(getRepositories(client, {})).pipe(
          map(E.mapLeft<DsError, Actions.Actions>(Actions.loadFail)),
          map(E.chain(E.fromNullable<Actions.Actions>(Actions.loadFail()))),
          map(E.map((v) => v.items)),
        ),
      }).pipe(
        map(({ dataTypes, repositories }) =>
          sequenceT(E.Apply)(dataTypes, repositories),
        ),
        map(
          E.map(([schema, repositories]) =>
            Actions.loadSuccess({
              schema,
              repositories,
            }),
          ),
        ),
        map(E.getOrElse(identity)),
      );
    }),
  );

  const create$ = state$.pipe(
    distinctUntilKeyChanged("type"),
    filter(State.isSaving),
    map((s) => s.payload),
    withLatestFrom(pyckAdminClient$),
    switchMap(([s, client]) =>
      from(
        createRepository(client, {
          name: s.name.value,
          type: s.type.value,
          dataTypeId: s.dataTypeId,
          isVirtual: s.isVirtual,
          parent: parentSearchState.states.selected.is(s.parent)
            ? s.parent.payload.item?.id
            : undefined,
          fields: extractFieldsFromSchema(
            "Ready:DataManager:Repositories:Create",
          )(s.schema.payload.values),
        }),
      ).pipe(
        dsErrorNotification(
          flow(
            E.map(Actions.saveSuccess),
            E.getOrElse<DsError, Actions.Actions>(Actions.saveError),
          ),
        ),
        catchError(() => of(Actions.saveError())),
      ),
    ),
  );

  const parentSearch$ = pyckAdminClient$.pipe(
    switchMap((client) =>
      parentSearchState.epic(
        state$.pipe(
          filter(State.isReady),
          map((s) => s.payload.parent),
        ),
        {
          get: (q) =>
            getRepositories(client, { where: { name: q } }).then(
              flow(
                E.map((r) => r.items),
                E.mapLeft(() => "unknown"),
              ),
            ),
        },
      ),
    ),
  );

  return merge(loading$, create$, fieldsSchema$, parentSearch$);
};
