Java多线程实现FTP批量上传文件

Aggie ·
更新时间:2024-11-14
· 510 次阅读

本文实例为大家分享了Java多线程实现FTP批量上传文件的具体代码,供大家参考,具体内容如下

1、构建FTP客户端 package cn.com.pingtech.common.ftp; import lombok.extern.slf4j.Slf4j; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; import java.io.*; import java.net.UnknownHostException; @Slf4j public class  FtpConnection {     private FTPClient ftp = new FTPClient();     private boolean is_connected = false;     /**      * 构造函数      */     public FtpConnection() {         is_connected = false;         ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000);         ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000);         ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000);         try {             initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password);         } catch (IOException e) {             e.printStackTrace();         }     }     /**      * 初始化连接      *      * @param host      * @param port      * @param user      * @param password      * @throws IOException      */     private void initConnect(String host, int port, String user, String password) throws IOException {         try {             ftp.connect(host, port);         } catch (UnknownHostException ex) {             throw new IOException("Can't find FTP server '" + host + "'");         }         int reply = ftp.getReplyCode();//220 连接成功         if (!FTPReply.isPositiveCompletion(reply)) {             disconnect();             throw new IOException("Can't connect to server '" + host + "'");         }         if (!ftp.login(user, password)) {             is_connected = false;             disconnect();             throw new IOException("Can't login to server '" + host + "'");         } else {             is_connected = true;         }     }     /**      * 上传文件      *      * @param path      * @param ftpFileName      * @param localFile      * @throws IOException      */     public boolean upload(String path, String ftpFileName, File localFile) throws IOException {         boolean is  = false;         //检查本地文件是否存在         if (!localFile.exists()) {             throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist.");         }         //设置工作路径         setWorkingDirectory(path);         //上传         InputStream in = null;         try {             //被动模式             ftp.enterLocalPassiveMode();             in = new BufferedInputStream(new FileInputStream(localFile));             //保存文件             is = ftp.storeFile(ftpFileName, in);         }catch (Exception e){             e.printStackTrace();         }         finally {             try {                 in.close();             } catch (IOException ex) {                 ex.printStackTrace();             }         }         return is;     }     /**      * 关闭连接      *      * @throws IOException      */     public void disconnect() throws IOException {         if (ftp.isConnected()) {             try {                 ftp.logout();                 ftp.disconnect();                 is_connected = false;             } catch (IOException ex) {                 ex.printStackTrace();             }         }     }     /**      * 设置工作路径      *      * @param dir      * @return      */     private boolean setWorkingDirectory(String dir) {         if (!is_connected) {             return false;         }         //如果目录不存在创建目录         try {             if (createDirecroty(dir)) {                 return ftp.changeWorkingDirectory(dir);             }         } catch (IOException e) {             e.printStackTrace();         }         return false;     }     /**      * 是否连接      *      * @return      */     public boolean isConnected() {         return is_connected;     }     /**      * 创建目录      *      * @param remote      * @return      * @throws IOException      */     private boolean createDirecroty(String remote) throws IOException {         boolean success = true;         String directory = remote.substring(0, remote.lastIndexOf("/") + 1);         // 如果远程目录不存在,则递归创建远程服务器目录         if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) {             int start = 0;             int end = 0;             if (directory.startsWith("/")) {                 start = 1;             } else {                 start = 0;             }             end = directory.indexOf("/", start);             while (true) {                 String subDirectory = new String(remote.substring(start, end));                 if (!ftp.changeWorkingDirectory(subDirectory)) {                     if (ftp.makeDirectory(subDirectory)) {                         ftp.changeWorkingDirectory(subDirectory);                     } else {                         log.error("mack directory error :/" + subDirectory);                         return false;                     }                 }                 start = end + 1;                 end = directory.indexOf("/", start);                 // 检查所有目录是否创建完毕                 if (end <= start) {                     break;                 }             }         }         return success;     } } 2、FTP连接工厂 package cn.com.pingtech.common.ftp; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; /**  * 连接工厂  */ @Slf4j public class FtpFactory {     //有界队列     private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize);     protected FtpFactory(){         log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize);         for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){             //表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false             arrayBlockingQueue.offer(new FtpConnection());         }     }     /**      * 获取连接      *      * @return      */     public FtpConnection getFtp() {         FtpConnection poll = null;         try {             //取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止             poll = arrayBlockingQueue.take();         } catch (InterruptedException e) {             e.printStackTrace();         }         return poll;     }     /**      * 释放连接      * @param ftp      * @return      */     public boolean relase(FtpConnection ftp){         return arrayBlockingQueue.offer(ftp);     }     /**      * 删除连接      *      * @param ftp      */     public void remove(FtpConnection ftp) {         arrayBlockingQueue.remove(ftp);     }     /**      * 关闭连接      */     public void close() {         for (FtpConnection connection : arrayBlockingQueue) {             try {                 connection.disconnect();             } catch (IOException e) {                 e.printStackTrace();             }         }     } } 3、FTP配置 package cn.com.pingtech.common.ftp; /**  * ftp 配置类  */ public class FtpConfig {     public static int defaultTimeoutSecond = 10;     public static int connectTimeoutSecond = 10;     public static int dataTimeoutSecond = 10;     public static String host = "127.0.0.1";     public static int port =9999;     public static String user = "Administrator";     public static String password ="Yp886611";     public static int threadPoolSize = 1;     public static int ftpConnectionSize = 1; } 4、构建多线程FTP上传任务 package cn.com.pingtech.common.ftp; import java.io.File; import java.io.IOException; import java.util.concurrent.Callable; /**  * 上传任务  */ public class UploadTask implements Callable{     private File file;     private FtpConnection ftp;     private String path;     private String fileName;     private FtpFactory factory;     public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){         this.factory = factory;         this.ftp = ftp;         this.file = file;         this.path = path;         this.fileName = fileName;     }     @Override     public UploadResult call() throws Exception {         UploadResult result = null;         try {             if (ftp == null) {                 result = new UploadResult(file.getAbsolutePath(), false);                 return result;             }             //如果连接未开启 重新获取连接             if (!ftp.isConnected()) {                 factory.remove(ftp);                 ftp = new FtpConnection();             }             //开始上传             result = new UploadResult(file.getName(), ftp.upload(path, fileName, file));         } catch (IOException ex) {             result = new UploadResult(file.getName(), false);             ex.printStackTrace();         } finally {             factory.relase(ftp);//释放连接         }         return result;     } } package cn.com.pingtech.common.ftp; /**  * 上传结果  */ public class UploadResult {     private String fileName; //文件名称     private boolean result; //是否上传成功     public UploadResult(String fileName, boolean result) {         this.fileName = fileName;         this.result = result;     }     public String getFileName() {         return fileName;     }     public void setFileName(String fileName) {         this.fileName = fileName;     }     public boolean isResult() {         return result;     }     public void setResult(boolean result) {         this.result = result;     }     public String toString() {         return "[fileName=" + fileName + " , result=" + result + "]";     } }

注意:实现Callable接口的任务线程能返回执行结果
Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞线程直到获取“将来”的结果,当不调用此方法时,主线程不会阻塞

5、FTP上传工具类 package cn.com.pingtech.common.ftp; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /**  * ftp上传工具包  */ public class FtpUtil {     /**      * 上传文件      *      * @param ftpPath      * @param listFiles      * @return      */     public static synchronized List upload(String ftpPath, File[] listFiles) {         //构建线程池         ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize);         List<Future> results = new ArrayList<>();         //创建n个ftp链接         FtpFactory factory = new FtpFactory();         for (File file : listFiles) {             FtpConnection ftp = factory.getFtp();//获取ftp con             UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName());             Future submit = newFixedThreadPool.submit(upload);             results.add(submit);         }         List listResults = new ArrayList<>();         for (Future result : results) {             try {                 //获取线程结果                 UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES);                 listResults.add(uploadResult);             } catch (Exception e) {                 e.printStackTrace();             }         }         factory.close();         newFixedThreadPool.shutdown();         return listResults;     } } 6、测试上传 package cn.com.pingtech.common.ftp class Client {     public static void main(String[] args) throws IOException {         String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0";         String ftpPath = "/data/jcz/";         File parentFile = new File(loalPath);         List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles());         for(UploadResult vo:list){             System.out.println(vo);         }     } }

注意:FTP协议里面,规定文件名编码为iso-8859-1,所以目录名或文件名需要转码



java多线程 JAVA 上传文件 ftp 线程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号