import { tableFromIPC } from "apache-arrow";
import { tableToIPC } from "apache-arrow";

import { getMetaData } from './performanceMetaData';
import { getAsBuffer, arrowAddDate } from './performanceData';
import { getDB } from './performanceDBCache';
import { ipcBufferToArrow, getDataValidInCache, getDataFromCache, clearInvalidCache, setCacheData } from './dbOperations';

import * as aq from "arquero";

import init, { 
  writeParquet, 
  readParquet, 
  Compression,
  WriterPropertiesBuilder 
} from './parquet';

const getDataFromRemote = async ({ date, dataset, identifier, id }) => {
  const buffer = await getAsBuffer(date, dataset, identifier)
  const data = tableFromIPC(buffer)

  return { buffer, data, date, dataset, identifier, id }
}

const nestedSum = (cols) => aq.escape((data) => {
  const agg = {}
  cols.forEach(col => {
    const subObj = data[col]
    Object.entries(subObj).forEach(([key, value]) => {
      if (!agg.hasOwnProperty(key)) agg[key] = 0
      agg[key] += value
    })
  })
  return agg
})

export const getViaCache = async (startDate, endDate, dataset) => {

  console.time("open-db-"+dataset)
  const db = await getDB()
  console.timeEnd("open-db-"+dataset)
  const requiredDatasets = await getMetaData(startDate, endDate, dataset) // hourly

  // check if we have appropriate cached datasets
  const cacheCheckPromises = requiredDatasets.map(requirement => {
    const {date, identifier, id} = requirement
    return getDataValidInCache(db)(date, dataset, identifier, id)
  })

  const cacheState = await Promise.all(cacheCheckPromises)

  const hasCache = cacheState.filter(row => row.hasCache)
  const missedCache = cacheState.filter(row => !row.hasCache)
  const invalidCache = missedCache.filter(row => row.cache_id)

  console.time(dataset + "-read-from-cache")
  const cleared = await Promise.all(invalidCache.map(clearInvalidCache(db)))
  const dataFromCache = await Promise.all(hasCache.map(getDataFromCache(db)))

  /* chunk requests to remote start */
  const chunkSize = 50; // set the number of requests to send at a time
  const chunks = [];
  const dataFromRemote = [];

  for (let i = 0; i < missedCache.length; i += chunkSize) {
    chunks.push(missedCache.slice(i, i + chunkSize));
  }

  for (const chunk of chunks) {
    const response = await Promise.all(chunk.map(getDataFromRemote));
    dataFromRemote.push(...response)
  }
  /* chunk requests to remote end */

  const cached = await Promise.all(dataFromRemote.map(setCacheData(db)))
  console.timeEnd(dataset + "-read-from-cache")

  //console.time(dataset + "-add-date")
  const results = [...dataFromCache, ...dataFromRemote]
  
  const dates = results.flatMap(result => {
    return [...Array(result.data.numRows)].map(_ => result['date'])
  })

  console.time(dataset + "-concat")
  const flat = results.flatMap(row => aq.fromArrow(row['data']).objects())
  let concat = aq.from(flat)
    .assign(aq.table({"date": dates}))

  // TODO: remove this. temporary for snapchat until dataset is fixed
  if (concat._names.includes("swipes")) {
    concat = concat
      .rename({"swipes":"clicks"}) 
  }

  // HACK for Facebook 7D Click + 1D View
  if (dataset.includes('facebook')) {
    concat = concat.derive({
      '7d_click_1d_view': nestedSum(['7d_click', '1d_view']),
      '7d_click_1d_view_value': nestedSum(['7d_click_value', '1d_view_value']),
    })
  }

  console.timeEnd(dataset + "-concat")

  return concat
}
