嵌入式开发第31、32天(项目2:用线程池实现大批量复制文件)

论坛 期权论坛 脚本     
匿名网站用户   2020-12-21 05:20   11   0

项目目标


1)在控制台调用程序输入 源文件 和目标文件,实现文件的完美复制。
2)即把文件的属性也复制了。
3)把批量的复制工作交给线程池去运行,体现多线程的效率。
4)在学会使用线程池的同时,了解线程池的运作机制,并且能够为我所用。

项目框架



1.创建若干线程,置入线程池

2.任务达到时,从线程池取空闲线程

3.取得了空闲线程,立即进行任务处理

4.否则新建一个线程,并置入线程池,执行3

5.如果创建失败或者线程池已满,根据设计策略选择返回错误或将任务置入处理队列,等待处理

6.销毁线程池


项目感想

1:做项目真的能够很好的巩固自己所学的东西
2:刚开始学习的线程池的时候真的学习的非常辛苦
3:里面的各种互斥,多线程并行工作的逻辑方式,真是抽象的不得了。
4:在做一两条线程的时候,你能很好的理清他们的运行方向,当你做到线程池的时候,成倍的难度增。可当你完成的时候你又能无比的了解他的运行方式。
5:虽然项目是实现出来了,复制的速度也确实比原来快个几秒,但这个项目并不能很好的体现出线程池的威力,这是我的个人感觉
6:其实选择用线程池实现复制功能,是因为 (电脑里面的文件) 能够给我带来大量的任务~而不是我自己去一个个的添加这些任务。
7:做这个项目,线程的另外一个很强大的功能没有体现,就是他的框架,线程池是可以添加各种各样的任务,而我为了嫌麻烦并没有搞,所以挖个坑,看以后有没有机会加上去。





项目代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>

#define MAX_WAITING_TASKS 1000  //等待任务最大数目
#define MAX_ACTIVE_THREADS 200   //最大线程数

struct task      //任务链表
{
 void *(*task)(void *arg); //void *  能够放置任何格式,只要在调用时强制转换
 void *arg;    

 struct task *next;
};

typedef struct thread_pool  //线程池 其实就是一个线程的结构体
{
 pthread_mutex_t lock;  //互斥锁
 pthread_cond_t  cond;  //条件变量  跟互斥锁是搭配使用
 struct task *task_list;  //一个任务节点

 pthread_t *tids;   //线程属性变量

 unsigned waiting_tasks;  //等待任务
 unsigned active_threads; //执行线程
 
 bool shutdown;    //一个线程池销毁开关
}thread_pool;

 struct file     //一个结构体 存放文件路径 和  复制后路径的
{
 char srcfile[4096];
 char dstfile[4096];
};

bool add_task(thread_pool *pool,void *(*task)(void *arg), void *arg);

void *copyregfile(void * arg) //复制文件函数 参数是文件结构体
{
 

 struct file *dofile = (struct file *)arg; //强制转换 赋值给文件结构体

 //printf("srcfile=%s\n",dofile->srcfile); //查看 信息是否正确
 //printf("dstfile =%s\n",dofile->dstfile );

 struct stat file_stat;    //这个结构体来自#include <sys/stat.h>
          //里面存放着一个文件的属性
 /*
 
 struct stat 
 {
  dev_t st_dev; //文件的设备编号
  ino_t st_ino; //节点
  mode_t st_mode; //文件的类型和存取的权限
  nlink_t st_nlink; //连到该文件的硬连接数目,刚建立的文件值为1
  uid_t st_uid; //用户ID
  gid_t st_gid; //组ID
  dev_t st_rdev; //(设备类型)若此文件为设备文件,则为其设备编号
  off_t st_size; //文件字节数(文件大小)
  unsigned long st_blksize; //块大小(文件系统的I/O 缓冲区大小)
  unsigned long st_blocks; //块数
  time_t st_atime; //最后一次访问时间
  time_t st_mtime; //最后一次修改时间
  time_t st_ctime; //最后一次改变时间(指属性)
 };
 
 */
          
 stat(dofile->srcfile, &file_stat);//通过文件名 获取文件的属性把他存放到结构体里面

 int srcfd,dstfd;     
 srcfd = open(dofile->srcfile,O_RDONLY);//用只读的方式打开源文件
 if(srcfd == -1 )
 {
  printf("open file %s\n failed.\n",dofile->srcfile);
  return NULL;
 }
 
 dstfd = open(dofile->dstfile,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);//以源文件的类型和权限创建文件
                   //st_mode存放的东西看下面*1*
 if( dstfd == -1)
 {
  printf("open file %s failed.\n",dofile->dstfile);
  return NULL;
 }

 int nread;
 char buf[100];
 while((nread = read(srcfd,buf,100)) > 0)  //读取源文件的内容
 {
  if( write(dstfd,buf,nread) == -1)   //把读到的全部写进目标文件
  {
   break;
  }
 }

 close(srcfd);
 close(dstfd);
 return NULL;
}



//拷贝目录,成功返回0.失败返回-1
int copydir( struct file *dofile,thread_pool *pool)
{
 struct stat file_stat;    
 stat(dofile->srcfile,&file_stat); //获取文件的属性
 mkdir(dofile->dstfile,file_stat.st_mode); //以源目录的类型和目录来创建一个目录

 DIR *srcdir = opendir(dofile->srcfile); //打开源目录
 struct dirent *dp;
 
 
 
 while( (dp = readdir(srcdir))!=NULL )    //获取文件夹内文件的信息
 {
  
 
  if(dp->d_name[0] == '.') //如果文件为. 或者 .. 则跳过
  {       
   continue;
  }
  //对本目录下的所有文件进行拷贝操作
  struct file *tmpfile = malloc(sizeof(struct file)); //为文件结构体开辟内存空间
  
  memset(tmpfile,0,sizeof(struct file));    //对内存清零 
  
  sprintf(tmpfile->srcfile,"%s/%s",dofile->srcfile,dp->d_name);//拼凑源文件路径
  sprintf(tmpfile->dstfile,"%s/%s",dofile->dstfile,dp->d_name);//拼凑目标文件路径

  struct stat tmpstat;
  stat(tmpfile->srcfile,&tmpstat);         

  if(S_ISREG(tmpstat.st_mode))      //如果为普通文件,则拷贝
  {
   printf("srcfile = %s\n",tmpfile->srcfile);
   printf("tmpfile->dstfile = %s\n", tmpfile->dstfile);
   
   add_task( pool, copyregfile, tmpfile);   //把复制的任务丢到任务链表
   
   
   
  }
  else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
  {
   copydir(tmpfile,pool);
  }

 }
 return 0;
}


void handler(void *arg)        //防止死锁 上一个博客有细讲
{ 
 pthread_mutex_unlock((pthread_mutex_t *)arg);
}

void *routine(void *arg)       //线程的调用的任务
{
 thread_pool *pool = (thread_pool *)arg;   //强制格式转换 复制给 线程池 
             //其实线程池说白了  就是名人的影分身~ 
 struct task *p;         //任务节点~

 while(1)
 {

  pthread_cleanup_push(handler, (void *)&pool->lock);//预防死锁的机制 上个博客有讲
  pthread_mutex_lock(&pool->lock);     //互斥锁~ 上锁 


  while(pool->waiting_tasks == 0 && !pool->shutdown) //判断有没任务,没有就睡眠~
  {
   pthread_cond_wait(&pool->cond, &pool->lock); //等待一个唤醒的信号
  }


  if(pool->waiting_tasks == 0 && pool->shutdown == true)//判断是不是要关闭线程
  {
   pthread_mutex_unlock(&pool->lock);    //解锁
   pthread_exit(NULL);        //退出线程
  }


  p = pool->task_list->next;   //从线程池中的任务链表里拿一个任务赋值给P
  
  pool->task_list->next = p->next; //然后让原来的任务节点脱离链表 
  
  pool->waiting_tasks--;    //简单的来说就是 从链表里提取了一个任务。拿了以后,链表也就没了这个任务           


  pthread_mutex_unlock(&pool->lock);    
  pthread_cleanup_pop(0);


  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何的取消
  
 
  (p->task)(p->arg);   //这就是一个函数
     
         //他运行的是add_task( pool, copyregfile, tmpfile); 
         //传过来的参数 
         //p->task 等于copyregfile
         //p->arg 等于  tmpfile
         //想一下为啥p突然有值了~ 这个逻辑很关键 
  
  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
  
  free(p->arg);       //释放在检索目录时 在内存开辟的空间
  free(p);        //释放掉完成任务的节点
 }

 pthread_exit(NULL);
}

bool init_pool(thread_pool *pool, unsigned int threads_number)//初始化线程池
{
 pthread_mutex_init(&pool->lock, NULL);     
 pthread_cond_init(&pool->cond, NULL);

 pool->shutdown = false;
 pool->task_list = malloc(sizeof(struct task));
 pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

 if(pool->task_list == NULL || pool->tids == NULL)
 {
  perror("allocate memory error");
  return false;
 }

 pool->task_list->next = NULL;

 pool->waiting_tasks = 0;
 pool->active_threads = threads_number;

 int i;
 for(i=0; i<pool->active_threads; i++)
 {
  if(pthread_create(&((pool->tids)[i]), NULL,
     routine, (void *)pool) != 0)
  {
   perror("create threads error");
   return false;
  }
 }

 return true;
}

bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)//添加任务
{
 struct task *new_task = malloc(sizeof(struct task));
 if(new_task == NULL)
 {
  perror("allocate memory error");
  return false;
 }
 
 
 new_task->task = task;
 new_task->arg = arg;
 new_task->next = NULL;
 
 
 pthread_mutex_lock(&pool->lock);
 
 if(pool->waiting_tasks >= MAX_WAITING_TASKS)
 {
  pthread_mutex_unlock(&pool->lock);

  fprintf(stderr, "too many tasks.\n");
  free(new_task);

  return false;
 }
 
 struct task *tmp = pool->task_list;
 while(tmp->next != NULL)
  tmp = tmp->next;

 tmp->next = new_task;
 pool->waiting_tasks++;


 pthread_mutex_unlock(&pool->lock);
 pthread_cond_signal(&pool->cond); //唤醒一个休眠的线程

 return true;
}


int add_thread(thread_pool *pool, unsigned additional_threads)  //添加线程
{
 if(additional_threads == 0)
  return 0;

 unsigned total_threads =
       pool->active_threads + additional_threads;

 int i, actual_increment = 0;
 for(i = pool->active_threads;
     i < total_threads && i < MAX_ACTIVE_THREADS;
     i++)
 {
  if(pthread_create(&((pool->tids)[i]),
    NULL, routine, (void *)pool) != 0)
  {
   perror("add threads error");

   if(actual_increment == 0)
    return -1;

   break;
  }
  actual_increment++; 
 }

 pool->active_threads += actual_increment;
 return actual_increment;
}

int remove_thread(thread_pool *pool, unsigned int removing_threads) //删除线程
{
 if(removing_threads == 0)
  return pool->active_threads;

 int remain_threads = pool->active_threads - removing_threads;
 remain_threads = remain_threads>0 ? remain_threads:1;

 int i;
 for(i=pool->active_threads-1; i>remain_threads-1; i--)
 {
  errno = pthread_cancel(pool->tids[i]);
  if(errno != 0)
   break;
 }

 if(i == pool->active_threads-1)
  return -1;
 else
 {
  pool->active_threads = i+1;
  return i+1;
 }
}

bool destroy_pool(thread_pool *pool)       //摧毁线程池
{

 pool->shutdown = true;
 pthread_cond_broadcast(&pool->cond);

 int i;
 for(i=0; i<pool->active_threads; i++)
 {
  errno = pthread_join(pool->tids[i], NULL);
  if(errno != 0)
  {
   printf("join tids[%d] error: %s\n",
     i, strerror(errno));
  }
  else
   printf("[%u] is joined\n", (unsigned)pool->tids[i]);
  
 }

 free(pool->task_list);
 free(pool->tids);
 free(pool);

 return true;
}


void *count_time(void *arg)       //写的一个计算时间的线程。。
{             //后来发现 在运行时加入 time 这个命令,系统会自动算时间
 int i = 0;
 while(1)
 {
  sleep(1);
  printf("sec: %d\n", ++i);
 }
}


int main(int argc,char *argv[])
{
 
 if(argc != 3)
 {
  printf("Please run : ./%s xxx yyy\n",argv[0]);
  return -1;
 }
 
 //计算运行时间
// pthread_t a;
 //pthread_create(&a, NULL, count_time, NULL);
 
 //初始化池
 
 thread_pool *pool = malloc(sizeof(thread_pool));
 init_pool(pool,100);
 
 struct file dofile;
 strcpy(dofile.srcfile,argv[1]);
 strcpy(dofile.dstfile,argv[2]);

 struct stat srcstat;
 stat(dofile.srcfile,&srcstat);
 

 if(S_ISREG(srcstat.st_mode))//如果为普通文件,则拷贝
 {
  copyregfile(&dofile);
  
 }
 else if(S_ISDIR(srcstat.st_mode))//如果为目录,则递归
 {
  copydir(&dofile,pool);
 }
 


 destroy_pool(pool);
 return 0;
 
}


/*     --------*1*---------
先前所描述的st_mode 则定义了下列数种情况:
 S_IFMT 0170000 文件类型的位遮罩
 S_IFSOCK 0140000 scoket
 S_IFLNK 0120000 符号连接
 S_IFREG 0100000 一般文件
 S_IFBLK 0060000 区块装置
 S_IFDIR 0040000 目录
 S_IFCHR 0020000 字符装置
 S_IFIFO 0010000 先进先出

 S_ISUID 04000 文件的(set user-id on execution)位
 S_ISGID 02000 文件的(set group-id on execution)位
 S_ISVTX 01000 文件的sticky位

 S_IRUSR(S_IREAD) 00400 文件所有者具可读取权限
 S_IWUSR(S_IWRITE)00200 文件所有者具可写入权限
 S_IXUSR(S_IEXEC) 00100 文件所有者具可执行权限

 S_IRGRP 00040 用户组具可读取权限
 S_IWGRP 00020 用户组具可写入权限
 S_IXGRP 00010 用户组具可执行权限

 S_IROTH 00004 其他用户具可读取权限
 S_IWOTH 00002 其他用户具可写入权限
 S_IXOTH 00001 其他用户具可执行权限
*/



分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP