# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# type: ignore[abstract]

from __future__ import annotations

import abc
import functools
import multiprocessing
import os
import re
import shutil
import subprocess
import time
from typing import TYPE_CHECKING, Callable, Literal, TypedDict, TypeVar

# (TODO: GhostScreaming) It will be removed later.
from paddle.base import core

from .log_util import logger

if TYPE_CHECKING:
    from typing_extensions import ParamSpec

    _InputT = ParamSpec("_InputT")
    _RetT = TypeVar("_RetT")

    _HDFSClientConfig = TypedDict(
        '_HDFSClientConfig', {'fs.default.name': str, 'hadoop.job.ugi': str}
    )

    class _FileInfo(TypedDict):
        path: str
        size: int


__all__ = []


class ExecuteError(Exception):
    pass


class FSFileExistsError(Exception):
    pass


class FSFileNotExistsError(Exception):
    pass


class FSTimeOut(Exception):
    pass


class FSShellCmdAborted(ExecuteError):
    pass


class FS:
    @abc.abstractmethod
    def ls_dir(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def is_file(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def is_dir(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def is_exist(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def upload(self, local_path, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def download(self, fs_path, local_path):
        raise NotImplementedError

    @abc.abstractmethod
    def mkdirs(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def delete(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def need_upload_download(self):
        raise NotImplementedError

    @abc.abstractmethod
    def rename(self, fs_src_path, fs_dst_path):
        raise NotImplementedError

    @abc.abstractmethod
    def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=False):
        raise NotImplementedError

    @abc.abstractmethod
    def upload_dir(self, local_dir, dest_dir):
        raise NotImplementedError

    @abc.abstractmethod
    def list_dirs(self, fs_path):
        raise NotImplementedError

    @abc.abstractmethod
    def touch(self, fs_path, exist_ok=True):
        raise NotImplementedError

    @abc.abstractmethod
    def cat(self, fs_path=None):
        raise NotImplementedError


class LocalFS(FS):
    """
    A tool of local file system.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import LocalFS

            >>> client = LocalFS()
            >>> subdirs, files = client.ls_dir("./")

    """

    def ls_dir(self, fs_path: str) -> tuple[list[str], list[str]]:
        """
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            Tuple: Return a 2-tuple, the first is a list of all its subdirectories,
            and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs, files = client.ls_dir("./")

        """
        if not self.is_exist(fs_path):
            return [], []

        dirs = []
        files = []
        for f in os.listdir(fs_path):
            if os.path.isdir(fs_path + "/" + f):
                dirs.append(f)
            else:
                files.append(f)

        return dirs, files

    def mkdirs(self, fs_path: str) -> None:
        """
        Create a local directory.

        Args:
            fs_path(str): The local directory path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_mkdirs")
                >>> client.delete("test_mkdirs")

        """
        assert not os.path.isfile(fs_path), f"{fs_path} is already a file"
        os.makedirs(fs_path, exist_ok=True)

    def rename(self, fs_src_path: str, fs_dst_path: str) -> None:
        """
        Rename the file.

        Args:
            fs_src_path(str): The actual name of the file or directory
            fs_dst_path(str): The new name of the file or directory.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_rename_src")
                >>> print(client.is_exist("test_rename_src"))
                True
                >>> client.rename("test_rename_src", "test_rename_dst")
                >>> print(client.is_exist("test_rename_src"))
                False
                >>> print(client.is_exist("test_rename_dst"))
                True
                >>> client.delete("test_rename_dst")

        """
        os.rename(fs_src_path, fs_dst_path)

    def _rmr(self, fs_path):
        shutil.rmtree(fs_path)

    def _rm(self, fs_path):
        os.remove(fs_path)

    def delete(self, fs_path: str) -> None:
        """
        Delete the local file path, whether it's a file or directory.

        Args:
            fs_path(str): The local file path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_localFS_mkdirs")
                >>> client.delete("test_localFS_mkdirs")

        """
        if not self.is_exist(fs_path):
            return

        if os.path.isfile(fs_path):
            return self._rm(fs_path)

        return self._rmr(fs_path)

    def need_upload_download(self) -> Literal[False]:
        return False

    def is_file(self, fs_path: str) -> bool:
        """
        Whether the local file path is a file.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_is_file")
                >>> print(client.is_file("test_is_file"))
                True
                >>> client.delete("test_is_file")

        """
        return os.path.isfile(fs_path)

    def is_dir(self, fs_path: str) -> bool:
        """
        Whether the local file path is a directory.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_is_dir")
                >>> print(client.is_dir("test_is_dir"))
                True
                >>> client.delete("test_is_dir")

        """
        return os.path.isdir(fs_path)

    def is_exist(self, fs_path: str) -> bool:
        """
        Whether the local file path exists.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Whether it's a file or directory, return true if the path exists,
            otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> local_fs = LocalFS()
                >>> ret = local_fs.is_exist("test_is_exist")

        """
        return os.path.exists(fs_path)

    def touch(self, fs_path: str, exist_ok: bool = True) -> None:
        """
        Create a local file.

        Args:
            fs_path(str): The local file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_touch")
                >>> client.delete("test_touch")

        """
        if self.is_exist(fs_path):
            if exist_ok:
                return
            raise FSFileExistsError
        with open(fs_path, 'a'):
            pass

    def mv(
        self,
        src_path: str,
        dst_path: str,
        overwrite: bool = False,
        test_exists: bool = False,
    ) -> None:
        """
        Move a local file or directory from `src_path` to `dst_path` .

        Args:
            src_path(str):  Name of the file or directory, that's needed to be moved.
            dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_mv_src")
                >>> client.mv("test_mv_src", "test_mv_dst")
                >>> client.delete("test_mv_dst")

        """
        if not self.is_exist(src_path):
            raise FSFileNotExistsError

        if overwrite and self.is_exist(dst_path):
            self.delete(dst_path)

        if self.is_exist(dst_path):
            raise FSFileExistsError

        return self.rename(src_path, dst_path)

    def list_dirs(self, fs_path: str) -> list[str]:
        """
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs = client.list_dirs("./")

        """
        if not self.is_exist(fs_path):
            return []

        dirs = [
            f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f)
        ]

        return dirs


def _handle_errors(
    max_time_out: float | None = None,
) -> Callable[[Callable[_InputT, _RetT]], Callable[_InputT, _RetT]]:
    def decorator(f: Callable[_InputT, _RetT]) -> Callable[_InputT, _RetT]:
        @functools.wraps(f)
        def handler(*args: _InputT.args, **kwargs: _InputT.kwargs) -> _RetT:
            o = args[0]
            time_out = max_time_out
            if time_out is None:
                time_out = float(o._time_out) / 1000.0
            else:
                time_out /= 1000.0
            inter = float(o._sleep_inter) / 1000.0

            start = time.time()
            last_print_time = start
            while True:
                try:
                    return f(*args, **kwargs)
                # important: only ExecuteError need to retry
                except ExecuteError as e:
                    if time.time() - start >= time_out:
                        raise FSTimeOut(
                            f"args:{args} timeout:{time.time() - start}"
                        )

                    time.sleep(inter)

                if time.time() - last_print_time > 30:
                    print(
                        f"hadoop operator timeout:args:{args} timeout:{time.time() - start}"
                    )
                    last_print_time = time.time()

        return handler

    return decorator


class HDFSClient(FS):
    """
    A tool of HDFS.

    Args:
        hadoop_home(str): Hadoop home.
        configs(dict): Hadoop config. It is a dictionary and needs to contain the
            keys: "fs.default.name" and "hadoop.job.ugi".

    Examples:

        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import HDFSClient
            >>> hadoop_home = "/home/client/hadoop-client/hadoop/"

            >>> configs = {
            ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
            ...     "hadoop.job.ugi": "hello,hello123"
            ... }

            >>> client = HDFSClient(hadoop_home, configs)
            >>> client.ls_dir("hdfs:/test_hdfs_client")
            ([], [])

    """

    pre_commands: list[str]

    def __init__(
        self,
        hadoop_home: str,
        configs: _HDFSClientConfig,
        time_out: int = 5 * 60 * 1000,  # ms
        sleep_inter: int = 1000,
    ) -> None:  # ms
        self.pre_commands = []
        hadoop_bin = f'{hadoop_home}/bin/hadoop'
        self.pre_commands.append(hadoop_bin)
        dfs = 'fs'
        self.pre_commands.append(dfs)

        if configs:
            for k, v in configs.items():
                config_command = f'-D{k}={v}'
                self.pre_commands.append(config_command)

        self._time_out = time_out
        self._sleep_inter = sleep_inter
        self._base_cmd = " ".join(self.pre_commands)
        self._bd_err_re = re.compile(
            r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:'
        )

    def _run_cmd(self, cmd, redirect_stderr=False, retry_times=5):
        exe_cmd = f"{self._base_cmd} -{cmd}"
        ret = 0
        output = None
        retry_sleep_second = 3
        for x in range(retry_times + 1):
            ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
            ret = int(ret)
            if ret == 0:
                break
            time.sleep(retry_sleep_second)
        if ret == 134:
            raise FSShellCmdAborted(cmd)

        return ret, output.splitlines()

    def _run_safe_cmd(self, cmd, redirect_stderr=False, retry_times=5):
        exe_cmd = [self._base_cmd, *cmd.split()]
        ret = 0
        output = ""
        retry_sleep_second = 3
        for x in range(retry_times + 1):
            try:
                process = subprocess.run(
                    exe_cmd,
                    check=True,
                    stdout=subprocess.PIPE,
                    stderr=(
                        subprocess.STDOUT
                        if redirect_stderr
                        else subprocess.PIPE
                    ),
                    text=True,
                )
                output = process.stdout
                break
            except subprocess.CalledProcessError as e:
                ret = e.returncode
                output = e.output
                time.sleep(retry_sleep_second)
            except Exception as e:
                break

        if ret == 134:
            raise FSShellCmdAborted(cmd)

    @_handle_errors()
    def list_dirs(self, fs_path: str) -> list[str]:
        """
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return []

        dirs, files = self._ls_dir(fs_path)
        return dirs

    @_handle_errors()
    def ls_dir(self, fs_path: str) -> tuple[list[str], list[str]]:
        """
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return [], []

        return self._ls_dir(fs_path)

    def _ls_dir(self, fs_path):
        cmd = f"ls {fs_path}"
        ret, lines = self._run_cmd(cmd)

        if ret != 0:
            raise ExecuteError(cmd)

        dirs = []
        files = []
        for line in lines:
            arr = line.split()
            if len(arr) != 8:
                continue

            p = os.path.basename(arr[7])
            if arr[0][0] == 'd':
                dirs.append(p)
            else:
                files.append(p)

        return dirs, files

    def _test_match(self, lines):
        for l in lines:
            m = self._bd_err_re.match(l)
            if m is not None:
                return m

        return None

    @_handle_errors()
    def is_dir(self, fs_path: str) -> bool:
        """
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return False

        return self._is_dir(fs_path)

    def _is_dir(self, fs_path):
        cmd = f"test -d {fs_path}"
        ret, lines = self._run_cmd(cmd, redirect_stderr=True, retry_times=1)
        if ret:
            # other error
            if self._test_match(lines):
                print('raise exception: ')
                print('\n'.join(lines))
                raise ExecuteError(cmd)

            return False

        return True

    def is_file(self, fs_path: str) -> bool:
        """
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return False

        return not self._is_dir(fs_path)

    @_handle_errors()
    def is_exist(self, fs_path: str) -> bool:
        """
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        """
        cmd = f"test -e {fs_path} "
        ret, out = self._run_cmd(cmd, redirect_stderr=True, retry_times=1)
        if ret != 0:
            return False

        return True

    def upload_dir(
        self, local_dir: str, dest_dir: str, overwrite: bool = False
    ) -> None:
        """
        upload dir to hdfs
        Args:
            local_dir(str): local dir
            dest_dir(str): hdfs dest dir
            overwrite(bool): is overwrite
        Returns:
            return code
        """
        local_dir = local_dir.rstrip("/")
        dest_dir = dest_dir.rstrip("/")
        local_basename = os.path.basename(local_dir)
        if self.is_exist(dest_dir + "/" + local_basename) and overwrite:
            self.delete(dest_dir + "/" + local_basename)
        if not self.is_exist(dest_dir):
            self.mkdirs(dest_dir)
        self._try_upload(local_dir, dest_dir)

    # can't retry
    def upload(
        self,
        local_path: str,
        fs_path: str,
        multi_processes: int = 5,
        overwrite: bool = False,
    ) -> None:
        """
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        """

        def __subprocess_upload(hdfs_path_single, datas):
            for data in datas:
                self._try_upload(data, hdfs_path_single)

        def get_local_files(path):
            """
            get local files
            Args:
                path(str): local path
            Returns:
                list of local files
            """
            rlist = []

            if not os.path.exists(path):
                return rlist

            if os.path.isdir(path):
                for file in os.listdir(path):
                    t = os.path.join(path, file)
                    rlist.append(t)
            else:
                rlist.append(path)
            return rlist

        local = LocalFS()
        if not local.is_exist(local_path):
            raise FSFileNotExistsError(f"{local_path} not exists")

        all_files = get_local_files(local_path)
        if not all_files:
            print("there are nothing need to upload, function exit")
            return

        if self.is_exist(fs_path) and overwrite:
            self.delete(fs_path)
            self.mkdirs(fs_path)

        procs = []
        for i in range(multi_processes):
            process_datas = self._split_files(all_files, i, multi_processes)
            p = multiprocessing.Process(
                target=__subprocess_upload, args=(fs_path, process_datas)
            )
            procs.append(p)
            p.start()

        # complete the processes
        for proc in procs:
            proc.join()

    @_handle_errors()
    def _try_upload(self, local_path, fs_path):
        cmd = f"put {local_path} {fs_path}"
        ret = 0
        try:
            ret, _ = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            self.delete(fs_path)
            raise e

    # can't retry
    def download(
        self,
        fs_path: str,
        local_path: str,
        multi_processes: int = 5,
        overwrite: bool = False,
    ) -> None:
        """
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.download("hdfs:/test_hdfs_client", "./")

        """

        def __subprocess_download(local_path, datas):
            """
            download file from HDFS
            Args:
                local_path(str): the local file path
                datas(str): the hdfs file path list
            """
            for data in datas:
                self._try_download(data, local_path)

        if not self.is_exist(fs_path):
            raise FSFileNotExistsError(f"{fs_path} not exits")
        # download file
        if self.is_file(fs_path):
            return self._try_download(fs_path, local_path)
        # download dir
        dirs, all_filenames = self.ls_dir(fs_path)
        all_files = [fs_path + "/" + i for i in all_filenames]
        all_files.extend([fs_path + "/" + i for i in dirs])
        procs = []
        for i in range(multi_processes):
            process_datas = self._split_files(all_files, i, multi_processes)
            p = multiprocessing.Process(
                target=__subprocess_download, args=(local_path, process_datas)
            )
            procs.append(p)
            p.start()

        # complete the processes
        for proc in procs:
            proc.join()

    @_handle_errors()
    def _try_download(self, fs_path, local_path):
        cmd = f"get {fs_path} {local_path}"
        ret = 0
        try:
            ret, _ = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            local_fs = LocalFS()
            local_fs.delete(local_path)
            raise e

    @_handle_errors()
    def mkdirs(self, fs_path: str) -> None:
        """
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        """
        if self.is_exist(fs_path):
            return

        out_hdfs = False

        cmd = f"mkdir {fs_path} "
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
        if ret != 0:
            for l in out:
                if "No such file or directory" in l:
                    out_hdfs = True
                    break
            if not out_hdfs:
                raise ExecuteError(cmd)

        if out_hdfs and not self.is_exist(fs_path):
            cmd = f"mkdir -p {fs_path}"
            ret, _ = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)

    def mv(
        self,
        fs_src_path: str,
        fs_dst_path: str,
        overwrite: bool = False,
        test_exists: bool = True,
    ) -> None:
        """
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

        """
        if overwrite and self.is_exist(fs_dst_path):
            self.delete(fs_dst_path)

        if test_exists:
            if not self.is_exist(fs_src_path):
                raise FSFileNotExistsError(f"{fs_src_path} is not exists")

            if self.is_exist(fs_dst_path):
                raise FSFileExistsError(f"{fs_dst_path} exists already")

        return self._try_mv(fs_src_path, fs_dst_path)

    @_handle_errors()
    def _try_mv(self, fs_src_path, fs_dst_path):
        cmd = f"mv {fs_src_path} {fs_dst_path}"
        ret = 0
        try:
            ret, _ = self._run_cmd(cmd, retry_times=1)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            if not self.is_exist(fs_src_path) and self.is_exist(fs_dst_path):
                return
            raise e

    def _rmr(self, fs_path):
        cmd = f"rmr {fs_path}"
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError(cmd)

    def _rm(self, fs_path):
        cmd = f"rm {fs_path}"
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError(cmd)

    @_handle_errors()
    def delete(self, fs_path: str) -> None:
        """
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.delete("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return

        is_dir = self._is_dir(fs_path)
        if is_dir:
            return self._rmr(fs_path)

        return self._rm(fs_path)

    def touch(self, fs_path: str, exist_ok: bool = True) -> None:
        """
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.touch("hdfs:/test_hdfs_client")

        """
        if self.is_exist(fs_path):
            if exist_ok:
                return
            raise FSFileExistsError

        return self._touchz(fs_path)

    @_handle_errors()
    def _touchz(self, fs_path):
        cmd = f"touchz {fs_path}"
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError(cmd)

    def need_upload_download(self) -> Literal[True]:
        return True

    def cat(self, fs_path: str | None = None) -> str:
        """
        Cat a remote HDFS file.

        Args:
            fs_path(str|None): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.cat("hdfs:/test_hdfs_client")
                ''

        """
        if self.is_file(fs_path):
            output = self._try_cat(fs_path)
            return "\n".join(output)
        else:
            return ""

    @_handle_errors()
    def _try_cat(self, fs_path):
        cmd = f"cat {fs_path}"
        ret, output = self._run_cmd(cmd, retry_times=1)
        if ret != 0:
            raise ExecuteError(cmd)
        return output

    def _split_files(self, files, trainer_id, trainers):
        """
        split file list
        Args:
            files(list): file list
            trainer_id(int): trainer mpi rank id
            trainers(int): all trainers num
        Returns:
            filelist(list): file list of current trainer
        """
        remainder = len(files) % trainers
        blocksize = len(files) // trainers

        blocks = [blocksize] * trainers
        for i in range(remainder):
            blocks[i] += 1

        trainer_files = [[]] * trainers
        begin = 0
        for i in range(trainers):
            trainer_files[i] = files[begin : begin + blocks[i]]
            begin += blocks[i]

        return trainer_files[trainer_id]

    def list_files_info(self, path_list: list[str]) -> list[_FileInfo]:
        """
        list_files return file path and size
        Args:
            path_list(list): file list
        Returns:
            filelist(list): file list with file path and size
        """
        if len(path_list) <= 0:
            return []

        file_list = []

        # concat filelist can speed up 'hadoop ls'
        str_concat = ""
        for path in path_list:
            str_concat += path + " "
        cmd = (
            "ls " + str_concat + " | awk '{if ($8 != \"\") {print $5\" \"$8 }}'"
        )
        ret, lines = self._run_cmd(cmd)
        if len(lines) == 0:
            logger.warning(f"list_files empty, path[{path_list}]")
            return []
        for line in lines:
            arr = line.split(' ')
            if len(arr) < 2:
                continue
            file_path = arr[1]
            file_size = int(arr[0])
            file_list.append({'path': file_path, 'size': file_size})

        return file_list


class AFSClient(FS):
    """
    A tool of AFS. Use AfsWrapper.
    When WITH_PSLIB=ON, you can use this class directly.
    When WITH_PSCORE=ON, you should export LD_LIBRARY_PATH='YOUR_AFSAPISO_PATH' before using this class.

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('depend on external file')
            >>> from paddle.distributed.fleet.utils.fs import AFSClient

            >>> client = AFSClient()
            >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
            >>> client.ls_dir("hdfs:/test_hdfs_client")

    """

    def __init__(self, time_out=5 * 60 * 1000, sleep_inter=1000):  # ms  # ms
        self._fs = core.AfsWrapper()
        self._time_out = time_out

    def init(self, fs_name, fs_user, fs_passwd, fs_conf):
        self._fs.init(fs_name, fs_user, fs_passwd, fs_conf)

    def list_dirs(self, fs_path):
        """
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return []
        # TODO:fengdanlei
        dirs, files = self._ls_dir(fs_path)
        return dirs

    def ls_dir(self, fs_path):
        """
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return [], []

        return self._ls_dir(fs_path)

    def _ls_dir(self, fs_path):
        files = self._fs.list(fs_path)
        dirs = [fs_path]
        return dirs, files

    def is_dir(self, fs_path):
        """
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_dir("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return False

        return self._is_dir(fs_path)

    def _is_dir(self, fs_path):
        list_path = self._fs.list(fs_path)
        if (len(list_path)) > 0:
            return True
        else:
            return False

    def is_file(self, fs_path):
        """
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return False

        return not self._is_dir(fs_path)

    def is_exist(self, fs_path):
        """
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        """
        return self._fs.exist(fs_path)

    def upload_dir(self, local_dir, dest_dir, overwrite=False):
        """
        upload dir to hdfs
        Args:
            local_dir(str): local dir
            dest_dir(str): hdfs dest dir
            overwrite(bool): is overwrite
        Returns:
            return code
        """
        local_dir = local_dir.rstrip("/")
        dest_dir = dest_dir.rstrip("/")
        local_basename = os.path.basename(local_dir)
        if self.is_exist(dest_dir + "/" + local_basename) and overwrite:
            self.delete(dest_dir + "/" + local_basename)
        if not self.is_exist(dest_dir):
            self.mkdirs(dest_dir)
        self._fs.upload(local_dir, dest_dir)

    # can't retry
    def upload(self, local_path, fs_path, multi_processes=1, overwrite=False):
        """
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        """

        local = LocalFS()
        if not local.is_exist(local_path):
            raise FSFileNotExistsError(f"{local_path} not exists")

        self._fs.upload(local_path, fs_path)

    def download(self, fs_path, local_path, multi_processes=1, overwrite=False):
        """
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.download("hdfs:/test_hdfs_client", "./")

        """

        if not self.is_exist(fs_path):
            raise FSFileNotExistsError(f"{fs_path} not exits")
        # download file
        if self.is_file(fs_path):
            return self._fs.download(local_path, fs_path)
        # download dir
        # all_filenames return whole afs path
        _, all_filenames = self.ls_dir(fs_path)
        for file_name in all_filenames:
            local_file_name = os.path.join(
                local_path, os.path.split(file_name)[1]
            )
            self._fs.download(local_file_name, file_name)

    def mkdirs(self, fs_path):
        """
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        """
        if self.is_exist(fs_path):
            return
        self._fs.mkdir(fs_path)

    def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
        """
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

        """
        if overwrite and self.is_exist(fs_dst_path):
            self.delete(fs_dst_path)

        if test_exists:
            if not self.is_exist(fs_src_path):
                raise FSFileNotExistsError(f"{fs_src_path} is not exists")

            if self.is_exist(fs_dst_path):
                raise FSFileExistsError(f"{fs_dst_path} exists already")

        self._fs.mv(fs_src_path, fs_dst_path)

    def delete(self, fs_path):
        """
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python


                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.delete("hdfs:/test_hdfs_client")

        """
        if not self.is_exist(fs_path):
            return
        self._fs.remove(fs_path)

    def touch(self, fs_path, exist_ok=True):
        """
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.touch("hdfs:/test_hdfs_client")

        """
        if self.is_exist(fs_path):
            if exist_ok:
                return
            raise FSFileExistsError

        return self._fs.touchz(fs_path)

    def need_upload_download(self):
        return True

    def cat(self, fs_path=None):
        """
        Cat a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.cat("hdfs:/test_hdfs_client")

        """
        if self.is_file(fs_path):
            return self._fs.cat(fs_path)
        else:
            return ""

    def _split_files(self, files, trainer_id, trainers):
        """
        split file list
        Args:
            files(list): file list
            trainer_id(int): trainer mpi rank id
            trainers(int): all trainers num
        Returns:
            filelist(list): file list of current trainer
        """
        remainder = len(files) % trainers
        blocksize = len(files) // trainers

        blocks = [blocksize] * trainers
        for i in range(remainder):
            blocks[i] += 1

        trainer_files = [[]] * trainers
        begin = 0
        for i in range(trainers):
            trainer_files[i] = files[begin : begin + blocks[i]]
            begin += blocks[i]

        return trainer_files[trainer_id]
