import {
  ModelEditor,
  ModelEditorNodeType,
  ModelEditorNode,
  ModelEditorNodeDomain,
  ModelEditorNodeJoin,
  ModelEditorNodeTransformation,
  ModelEditorNodeResult,
  ModelEditorNodeGroupBy,
  ModelEditorNodeUnion,
  ModelEditorNodeUnionType,
  ModelEditorNodeUnionMode,
  ModelEditorNodePivot,
  ModelEditorNodeUnPivot,
  ModelEditorNodeSql,
} from '@modules/modelEditor/ModelEditorTypes';
import { encodeBase64 } from './modeEditorSharedUtils';
import { getFullTableName } from './modelEditorUtils';

export const compileSparkInstruction = (
  data: ModelEditor,
  id: string,
  fallbackCHDB: string,
  cluster: string,
): SparkInstruction => {
  return {
    id,
    config: JSON.stringify(prepareConfigData(data, fallbackCHDB, cluster)),
  };
};

const escapeName = (name: string) => `\`${name}\``;

const dataConverter: Record<
  ModelEditorNodeType,
  (data: ModelEditorNode, fallbackCHDB: string, cluster: string) => SparkInstructionNodes
> = {
  domain: ({ id, data }, fallbackCHDB): SparkInstructionNodeCHRead => {
    const nodeData = data as ModelEditorNodeDomain;
    return {
      id,
      value: {
        operation: 'READ',
        customSql: 'true',
        storage: 'clickhousejdbc',
        jdbcUrl: `#CLICKHOUSE_JDBC_URL#`,
        user: `#CLICKHOUSE_USER#`,
        password: `#CLICKHOUSE_PASSWORD#`,
        'option.dbtable': `select * from ${getFullTableName(nodeData.tableName, fallbackCHDB)}`,
      },
    };
  },
  join: ({ id, data, children }): SparkInstructionNodeJoin => {
    const nodeData = data as ModelEditorNodeJoin;
    return {
      id,
      value: {
        operation: 'JOIN',
        joinType: (['left', 'right'].includes(nodeData.type)
          ? nodeData.type
          : 'ignore') as SparkInstructionNodeJoin['value']['joinType'],
        leftDataset: children[0],
        rightDataset: children[1],
        columns: nodeData.relations.join(','),
        dropDuplicateColumns: 'true',
      },
    };
  },
  union: ({ id, data, children }): SparkInstructionNodeUnion => {
    const nodeData = data as ModelEditorNodeUnion;

    return {
      id,
      value: {
        operation: 'UNION',
        operationType: 'union',
        type: nodeData.type,
        mode: nodeData.mode,
      },
    };
  },
  result: ({ id, data }, fallbackCHDB, cluster: string): SparkInstructionNodeCHWrite => {
    const nodeData = data as ModelEditorNodeResult;
    const escapedPKNames = nodeData.primaryKeys?.map(escapeName);
    const distributedShardingKeys = escapedPKNames?.map((item) => `coalesce(toString(${item}),'0')`)?.join(',')!;
    return {
      id,
      value: {
        operation: 'WRITE',
        storage: 'clickhousejdbc',
        jdbcUrl: `#CLICKHOUSE_JDBC_URL#`,
        user: `#CLICKHOUSE_USER#`,
        password: `#CLICKHOUSE_PASSWORD#`,
        table: `${getFullTableName(nodeData.tableName, fallbackCHDB)}`,
        writeMode: 'overwrite',
        'option.truncate': 'true',
        'option.nullAsDefault': '2',
        'option.createTableOptions': `ENGINE=ReplacingMergeTree ORDER BY (${escapedPKNames?.join(',')})`,
        'option.isDistributed': 'true',
        'option.engine': 'ReplacingMergeTree',
        'option.mergingKeys': escapedPKNames?.join(',')!,
        'option.distributedShardingKeys': encodeBase64(`xxHash32(${distributedShardingKeys})`),
        'option.distributedSettings': 'fsync_after_insert=1, fsync_directories=1',
        'option.cluster': cluster,
      },
    };
  },
  // result: ({ id, data }): SparkInstructionNodeStdOut => {
  //   return {
  //     id,
  //     value: {
  //       operation: 'WRITE',
  //       quantity: '100',
  //       storage: 'STDOUT',
  //     },
  //   };
  // },
  transform: ({ id, data }): SparkInstructionNodeTransform => {
    const nodeData = data as ModelEditorNodeTransformation;

    const statement = nodeData.transformation
      .filter((item) => !item.deleted)
      .map((item) => {
        let name = escapeName(item.name);

        if (item.newName) {
          return `${item.expression || name} as ${escapeName(item.newName)}`;
        }
        return item.expression ? `${item.expression} as ${name}` : name;
      })
      .join(', ');

    return {
      id,
      value: {
        mode: 'Simple',
        operation: 'TRANSFORM',
        statement: encodeBase64(statement),
      },
    };
  },
  groupBy: ({ id, data }): SparkInstructionNodeGroupBy => {
    const nodeData = data as ModelEditorNodeGroupBy;

    return {
      id,
      value: {
        dropGroupingColumns: nodeData.dropGroupingColumns ? 'true' : 'false',
        groupingColumns: nodeData.groupBy.join(','),
        groupingCriteria: nodeData.aggregateFn
          .map((item) => `${item.variable}:${item.operation.toLowerCase()}`)
          .join(','),
        operation: 'GROUP',
      },
    };
  },
  pivot: ({ id, data }): SparkInstructionNodePivot => {
    const nodeData = data as ModelEditorNodePivot;

    return {
      id,
      value: {
        groupingColumns: nodeData.groupingColumns.join(','),
        pivotColumn: nodeData.pivotColumn,
        aggregateFunction: nodeData.aggregateFn,
        columns: nodeData.columns.join(','),
        translatedValues: nodeData.translatedValues?.length ? nodeData.translatedValues.join(',') : undefined,
        operation: 'PIVOT',
      },
    };
  },
  unpivot: ({ id, data }): SparkInstructionNodeUnPivot => {
    const nodeData = data as ModelEditorNodeUnPivot;

    return {
      id,
      value: {
        'option.unchangedColumns': nodeData.unchangedColumns.join(','),
        'option.unpivotColumns': nodeData.unpivotColumns.join(','),
        'option.unpivotNames': nodeData.unpivotNames.join(','),
        operationType: 'unpivot',
        operation: 'UNPIVOT',
      },
    };
  },
  sql: ({ id, data }) => {
    const nodeData = data as ModelEditorNodeSql;
    return {
      id,
      value: {
        operation: 'READ',
        customSql: 'true',
        storage: 'clickhousejdbc',
        jdbcUrl: `#CLICKHOUSE_JDBC_URL#`,
        user: `#CLICKHOUSE_USER#`,
        password: `#CLICKHOUSE_PASSWORD#`,
        'option.dbtable': nodeData.sql_statement,
      },
    };
  },
};

const prepareConfigData = (data: ModelEditor, fallbackCHDB: string, cluster: string): SparkInstructionConfig => {
  const nodes: SparkInstructionConfig['nodes'] = [];
  const edges: SparkInstructionConfig['edges'] = [];

  data.forEach((item) => {
    nodes.push(dataConverter[item.type](item, fallbackCHDB, cluster));
    if (Array.isArray(item.children)) {
      item.children.forEach((child) => {
        edges.push({
          value: {
            operation: 'EDGE',
          },
          target: item.id,
          source: child,
        });
      });
    }
  });

  return { nodes, edges };
};

interface SparkInstructionNodeEmpty {
  id: string;
}

interface SparkInstructionNodeReadStaticData {
  id: string;
  value: {
    operation: 'READ';
    schema?: string; // Base64
    data: string; // JSON.stringify
    storage: 'dataframe';
  };
}

interface SparkInstructionNodeStdOut {
  id: string;
  value: {
    storage: 'STDOUT';
    quantity: string; // '100'
    operation: 'WRITE';
  };
}

interface SparkInstructionNodeCHRead {
  id: string;
  value: {
    password: string;
    jdbcUrl: string;
    customSql: string;
    'option.dbtable': string;
    storage: 'clickhousejdbc';
    operation: 'READ';
    user: string;
  };
}

interface SparkInstructionNodeCHWrite {
  id: string;
  value: {
    password: string;
    jdbcUrl: string;
    'option.nullAsDefault'?: '2';
    'option.createTableOptions'?: string;
    'option.dbtable'?: string;
    'option.distributedShardingKeys': string;
    'option.distributedSettings': 'fsync_after_insert=1, fsync_directories=1';
    'option.mergingKeys': string;
    'option.cluster': string;
    'option.isDistributed'?: 'true';
    'option.engine': 'ReplacingMergeTree';
    'option.truncate'?: 'true';
    storage: 'clickhousejdbc';
    writeMode?: 'append' | 'overwrite';
    operation: 'WRITE';
    user: string;
    table?: string;
  };
}

interface SparkInstructionNodeTransform {
  id: string;
  value: {
    statement: string;
    mode: 'Simple' | 'Full_SQL' | 'Statement' | 'Drop' | 'Command';
    operation: 'TRANSFORM';
  };
}

interface SparkInstructionNodeGroupBy {
  id: string;
  value: {
    dropGroupingColumns: 'true' | 'false';
    groupingCriteria: string;
    groupingColumns: string;
    operation: 'GROUP';
  };
}

interface SparkInstructionNodePivot {
  id: string;
  value: {
    groupingColumns: string;
    pivotColumn: string;
    aggregateFunction: string;
    columns: string;
    translatedValues?: string;
    operation: 'PIVOT';
  };
}

interface SparkInstructionNodeUnPivot {
  id: string;
  value: {
    'option.unchangedColumns': string;
    'option.unpivotColumns': string;
    'option.unpivotNames': string;
    operationType: 'unpivot';
    operation: 'UNPIVOT';
  };
}

interface SparkInstructionNodeJoin {
  id: string;
  value: {
    operation: 'JOIN';
    joinType: 'left' | 'right' | 'ignore' | 'cross';
    leftDataset: string;
    rightDataset: string;
    dropDuplicateColumns: 'true' | 'false';
    columns?: string; // 'user_id'
    leftColumns?: string;
    rightColumns?: string;
  };
}

interface SparkInstructionNodeUnion {
  id: string;
  value: {
    operation: 'UNION';
    operationType: 'union';
    type: ModelEditorNodeUnionType;
    mode: ModelEditorNodeUnionMode;
  };
}

type SparkInstructionNodes =
  | SparkInstructionNodeEmpty
  | SparkInstructionNodeJoin
  | SparkInstructionNodeReadStaticData
  | SparkInstructionNodeStdOut
  | SparkInstructionNodeGroupBy
  | SparkInstructionNodeCHRead
  | SparkInstructionNodeCHWrite;

type SparkInstructionEdge = {
  value: {
    operation: 'EDGE';
  };
  source: string;
  target: string;
};

interface SparkInstructionConfig {
  nodes: SparkInstructionNodes[];
  edges: SparkInstructionEdge[];
}

interface SparkInstruction {
  id: string;
  config: string;
}
