"""
Async I/O Manager for efficient data handling
Supports Parquet, JSON, and optimized file operations
"""

import asyncio
import aiofiles
import json
import numpy as np
from pathlib import Path
from typing import Dict, List, Any, Union, Optional
import pickle
from concurrent.futures import ThreadPoolExecutor

try:
    import pyarrow as pa
    import pyarrow.parquet as pq
    HAS_PYARROW = True
except ImportError:
    HAS_PYARROW = False

from config import ProcessingConfig


class AsyncIOManager:
    """Async I/O manager with Parquet and optimized file operations"""

    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.executor = ThreadPoolExecutor(max_workers=config.max_workers)

    async def save_vectors_optimized(self, vectors: np.ndarray, metadata: Dict, output_dir: Union[str, Path]):
        """Save vectors and metadata using optimal format"""
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)

        # Save vectors in optimal format
        vectors = vectors.astype('float32')

        if self.config.use_parquet and HAS_PYARROW:
            await self._save_as_parquet(vectors, metadata, output_dir)
        else:
            await self._save_as_numpy(vectors, metadata, output_dir)

    async def _save_as_parquet(self, vectors: np.ndarray, metadata: Dict, output_dir: Path):
        """Save as Parquet for optimal I/O performance"""
        loop = asyncio.get_event_loop()

        # Convert to PyArrow Table
        vector_columns = {f"dim_{i}": vectors[:, i] for i in range(vectors.shape[1])}
        table = pa.table(vector_columns)

        # Add metadata
        metadata_dict = {"metadata": json.dumps(metadata)}
        table = table.replace_schema_metadata(metadata_dict)

        # Write asynchronously
        await loop.run_in_executor(
            self.executor,
            pq.write_table,
            table,
            str(output_dir / "vectors.parquet"),
            compression='zstd' if self.config.compression else None
        )

    async def _save_as_numpy(self, vectors: np.ndarray, metadata: Dict, output_dir: Path):
        """Save as compressed numpy format"""
        loop = asyncio.get_event_loop()

        # Save vectors
        await loop.run_in_executor(
            self.executor,
            np.save,
            str(output_dir / "vectors.npy"),
            vectors
        )

        # Save metadata asynchronously
        async with aiofiles.open(output_dir / "metadata.json", 'w') as f:
            await f.write(json.dumps(metadata, indent=2))

    async def load_vectors(self, input_dir: Union[str, Path]) -> tuple:
        """Load vectors and metadata"""
        input_dir = Path(input_dir)

        if (input_dir / "vectors.parquet").exists() and HAS_PYARROW:
            return await self._load_from_parquet(input_dir)
        elif (input_dir / "vectors.npy").exists():
            return await self._load_from_numpy(input_dir)
        else:
            raise FileNotFoundError("No vector file found")

    async def _load_from_parquet(self, input_dir: Path) -> tuple:
        """Load from Parquet format"""
        loop = asyncio.get_event_loop()

        # Read table
        table = await loop.run_in_executor(
            self.executor,
            pq.read_table,
            str(input_dir / "vectors.parquet")
        )

        # Extract metadata
        metadata = json.loads(table.schema.metadata.get(b'metadata', b'{}').decode())

        # Convert to numpy array
        vectors = np.column_stack([table.column(f"dim_{i}").to_numpy() for i in range(len(table.columns))])

        return vectors, metadata

    async def _load_from_numpy(self, input_dir: Path) -> tuple:
        """Load from numpy format"""
        loop = asyncio.get_event_loop()

        # Load vectors
        vectors = await loop.run_in_executor(
            self.executor,
            np.load,
            str(input_dir / "vectors.npy")
        )

        # Load metadata
        async with aiofiles.open(input_dir / "metadata.json", 'r') as f:
            metadata = json.loads(await f.read())

        return vectors, metadata

    async def update_processed_files(self, file_path: Union[str, Path], processed_files: Dict):
        """Async update of processed_files.json"""
        file_path = Path(file_path)

        async with aiofiles.open(file_path, 'w') as f:
            await f.write(json.dumps(processed_files, indent=2))

    async def save_json_async(self, data: Dict, file_path: Union[str, Path]):
        """Async JSON save"""
        file_path = Path(file_path)

        async with aiofiles.open(file_path, 'w') as f:
            await f.write(json.dumps(data, indent=2))

    async def load_json_async(self, file_path: Union[str, Path]) -> Dict:
        """Async JSON load"""
        file_path = Path(file_path)

        if not file_path.exists():
            return {}

        async with aiofiles.open(file_path, 'r') as f:
            content = await f.read()

        return json.loads(content) if content else {}

    async def save_pickle_async(self, data: Any, file_path: Union[str, Path]):
        """Async pickle save"""
        file_path = Path(file_path)
        loop = asyncio.get_event_loop()

        await loop.run_in_executor(
            self.executor,
            pickle.dump,
            data,
            open(file_path, 'wb')
        )

    async def load_pickle_async(self, file_path: Union[str, Path]) -> Any:
        """Async pickle load"""
        file_path = Path(file_path)
        loop = asyncio.get_event_loop()

        return await loop.run_in_executor(
            self.executor,
            pickle.load,
            open(file_path, 'rb')
        )

    def close(self):
        """Clean up resources"""
        self.executor.shutdown(wait=True)
