当前位置:首页 > 芯闻号 > 充电吧
[导读]考虑到移植以及灵活性,LevelDB将系统相关的处理(文件/进程/时间)抽象成Evn,用户可以自己实现相应的接口,作为option的一部分传入,默认使用自带的实现。 env.h中声明了:虚基类env,


考虑到移植以及灵活性,LevelDB将系统相关的处理(文件/进程/时间)抽象成Evn,用户可以自己实现相应的接口,作为option的一部分传入,默认使用自带的实现。 
env.h中声明了:

虚基类env,在env_posix.cc中,派生类PosixEnv继承自env类,是LevelDB的默认实现。虚基类WritableFile、SequentialFile、RandomAccessFile,分别是文件的写抽象类,顺序读抽象类和随机读抽象类类Logger,log文件的写入接口,log文件是防止系统异常终止造成数据丢失,是memtable在磁盘的备份类FileLock,为文件上锁WriteStringToFile、ReadFileToString、Log三个全局函数,封装了上述接口

下面来看看env_posix.cc中为我们写好的默认实现

顺序读:


class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;

 public:
  PosixSequentialFile(const std::string& fname, FILE* f)
    : filename_(fname), file_(f) { }
  virtual ~PosixSequentialFile() { fclose(file_); }
  // 从文件中读取n个字节存放到 "scratch[0..n-1]", 然后将"scratch[0..n-1]"转化为Slice类型并存放到*result中
  // 如果正确读取,则返回OK status,否则返回non-OK status
  virtual Status Read(size_t n, Slice* result, char* scratch) {
  Status s;
#ifdef BSD
  // fread_unlocked doesn't exist on FreeBSD
  size_t r = fread(scratch, 1, n, file_);
#else
  // size_t fread_unlocked(void *ptr, size_t size, size_t n,FILE *stream);
  // ptr:用于接收数据的内存地址
  // size:要读的每个数据项的字节数,单位是字节
  // n:要读n个数据项,每个数据项size个字节
  // stream:输入流
  // 返回值:返回实际读取的数据大小
  // 因为函数名带了"_unlocked"后缀,所以它不是线程安全的
  size_t r = fread_unlocked(scratch, 1, n, file_);
#endif
  // Slice的第二个参数要用实际读到的数据大小,因为读到文件尾部,剩下的字节数可能小于n
  *result = Slice(scratch, r);
  if (r < n) {
    if (feof(file_)) {
    // We leave status as ok if we hit the end of the file
    // 如果r<n,且feof(file_)非零,说明到了文件结尾,什么都不用做,函数结束后会返回OK Status
    } else {
    // A partial read with an error: return a non-ok status
    // 否则返回错误信息
    s = Status::IOError(filename_, strerror(errno));
    }
  }
  return s;
  }
  // 跳过n字节的内容,这并不比读取n字节的内容慢,而且会更快。
  // 如果到达了文件尾部,则会停留在文件尾部,并返回OK Status。
  // 否则,返回错误信息
  virtual Status Skip(uint64_t n) {
   // int fseek(FILE *stream, long offset, int origin);
   // stream:文件指针
   // offset:偏移量,整数表示正向偏移,负数表示负向偏移
   // origin:设定从文件的哪里开始偏移, 可能取值为:SEEK_CUR、 SEEK_END 或 SEEK_SET
   // SEEK_SET: 文件开头
   // SEEK_CUR: 当前位置
   // SEEK_END: 文件结尾
   // 其中SEEK_SET, SEEK_CUR和SEEK_END和依次为0,1和2.
   // 举例:
   // fseek(fp, 100L, 0); 把fp指针移动到离文件开头100字节处;
   // fseek(fp, 100L, 1); 把fp指针移动到离文件当前位置100字节处;
   // fseek(fp, 100L, 2); 把fp指针退回到离文件结尾100字节处。
   // 返回值:成功返回0,失败返回非0
  if (fseek(file_, n, SEEK_CUR)) {
    return Status::IOError(filename_, strerror(errno));
  }
  return Status::OK();
  }
};

这就是LevelDB从磁盘顺序读取文件的接口了,用的是C的流文件操作和FILE结构体。需要注意的是Read接口读取文件时不会锁住文件流,因此外部的并发访问需要自行提供并发控制。
随机读:


class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;
  mutable boost::mutex mu_;

 public:
  PosixRandomAccessFile(const std::string& fname, int fd)
    : filename_(fname), fd_(fd) { }
  virtual ~PosixRandomAccessFile() { close(fd_); }
  // 这里与顺序读的同名函数相比,多了一个参数offset,offset用来指定
  // 读取位置距离文件起始位置的偏移量,这样就可以实现随机读了。
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
            char* scratch) const {
    Status s;
#ifdef WIN32
    // no pread on Windows so we emulate it with a mutex
    boost::unique_locklock(mu_);

    if (::_lseeki64(fd_, offset, SEEK_SET) == -1L) {
      return Status::IOError(filename_, strerror(errno));
    }
	// int _read(int _FileHandle, void * _DstBuf, unsigned int _MaxCharCount)
	// _FileHandle:文件描述符
	// _DstBuf:保存读取数据的缓冲区
	// _MaxCharCount:读取的字节数
	// 返回值:成功返回读取的字节数,出错返回-1并设置errno。
    int r = ::_read(fd_, scratch, n);
    *result = Slice(scratch, (r < 0) ? 0 : r);
    lock.unlock();
#else
	// 在非windows系统上使用pread进行随机读,为何此时不用锁呢?详见下文分析
    ssize_t r = pread(fd_, scratch, n, static_cast(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
#endif
    if (r < 0) {
      // An error: return a non-ok status
      s = Status::IOError(filename_, strerror(errno));
    }
    return s;
  }
};



可以看到的是,PosixRandomAccessFile 在非windows系统上使用了 pread 来实现原子的定位加访问功能。常规的随机访问文件的过程可以分为两步,fseek (seek) 定位到访问点,调用 fread (read) 来从特定位置开始访问 FILE* (fd)。然而,这两个操作组合在一起并不是原子的,即 fseek 和 fread 之间可能会插入其他线程的文件操作。相比之下 pread 由系统来保证实现原子的定位和读取组合功能。需要注意的是,pread 操作不会更新文件指针。

需要注意的是,在随机读和顺序读中,分别用fd和FILE *来表示一个文件。文件描述符(file descriptor)是系统层的概念, fd 对应于系统打开文件表里面的一个文件;FILE* 是应用层的概念,其包含了应用层操作文件的数据结构。

顺序写:


class BoostFile : public WritableFile {

public:
  explicit BoostFile(std::string path) : path_(path), written_(0) {
    Open();
  }

  virtual ~BoostFile() {
    Close();
  }

private:
  void Open() {
    // we truncate the file as implemented in env_posix
	 // trunc:先将文件中原有的内容清空
	 // out:为输出(写)而打开文件
	 // binary:以二进制方式打开文件
     file_.open(path_.generic_string().c_str(), 
         std::ios_base::trunc | std::ios_base::out | std::ios_base::binary);
     written_ = 0;
  }

public:
  virtual Status Append(const Slice& data) {
    Status result;
    file_.write(data.data(), data.size());
    if (!file_.good()) {
      result = Status::IOError(
          path_.generic_string() + " Append", "cannot write");
    }
    return result;
  }

  virtual Status Close() {
    Status result;

    try {
      if (file_.is_open()) {
        Sync();
	// 关闭流时,缓冲区中的数据会自动写入到文件
	// 上面调用Sync()强制刷新,是为了确保数据写入,防止数据丢失
        file_.close();
      }
    } catch (const std::exception & e) {
      result = Status::IOError(path_.generic_string() + " close", e.what());
    }

    return result;
  }

  virtual Status Flush() {
    file_.flush();
    return Status::OK();
  }
  // 手动刷新(清空输出缓冲区,并把缓冲区内容同步到文件)
  virtual Status Sync() {
    Status result;
    try {
      Flush();
    } catch (const std::exception & e) {
      result = Status::IOError(path_.string() + " sync", e.what());
    }

    return result;
  }

private:
  boost::filesystem::path path_;
  boost::uint64_t written_;
  std::ofstream file_;
};


关于ofstream::flush和ofstream::Close的区别,详见:C++之ofstream::flush与ofstream::close

文件锁:


class BoostFileLock : public FileLock {
 public:
  boost::interprocess::file_lock fl_;
};
virtual Status LockFile(const std::string& fname, FileLock** lock) {
    *lock = NULL;

    Status result;

    try {
      if (!boost::filesystem::exists(fname)) {
        std::ofstream of(fname, std::ios_base::trunc | std::ios_base::out);
      }

      assert(boost::filesystem::exists(fname));

      boost::interprocess::file_lock fl(fname.c_str());
      BoostFileLock * my_lock = new BoostFileLock();
      my_lock->fl_ = std::move(fl);
      if (my_lock->fl_.try_lock())
        *lock = my_lock;
      else
        result = Status::IOError("acquiring lock " + fname + " failed");
    } catch (const std::exception & e) {
      result = Status::IOError("lock " + fname, e.what());
    }

    return result;
  }
 virtual Status UnlockFile(FileLock* lock) {

    Status result;

    try {
      BoostFileLock * my_lock = static_cast(lock);
      my_lock->fl_.unlock();
      delete my_lock;
    } catch (const std::exception & e) {
      result = Status::IOError("unlock", e.what());
    }

    return result;
  }

文件的锁操作是调用Boost的锁实现的。加锁是为了防止多进程的并发冲突,如果加锁失败,*lock=NULL,且返回non-OK;如果加锁成功,*lock存放的的是锁的指针,并返回OK。如果进程退出,锁会自动释放,否则用户需要调用UnlockFile显式的释放锁。


这几个方法都非常简单,比较晦涩的是这句:my_lock->std::move(f1),从函数名来看,是要移动f1。其实std::move是C++11标准库在

计划任务:

PosixEnv还有一个很重要的功能,计划任务,也就是后台的compaction线程。compaction就是压缩合并的意思,在LevelDB源码分析之六:skiplist(2)中也有提到。对于LevelDB来说,写入记录操作很简单,删除记录仅仅写入一个删除标记就算完事,但是读取记录比较复杂,需要在内存以及各个层级文件中依照新鲜程度依次查找,代价很高。为了加快读取速度,LevelDB采取了compaction的方式来对已有的记录进行整理压缩,通过这种方式,来删除掉一些不再有效的KV数据,减小数据规模,减少文件数量等。

PosixEnv中定义了一个任务队列:


  struct BGItem { void* arg; void (*function)(void*); };
  //用的是deque双端队列作为底层的数据结构
  typedef std::dequeBGQueue;
  BGQueue queue_;

主线程一旦判定需要进行compaction操作,就把compaction任务压进队列queue_中,BGItem是存有任务函数和db对象指针的结构。而后台线程从一开始就不断根据队列中的函数指针执行compaction任务。BGThread()函数就是不停的在queue_中取出函数指针,执行。



后台进程一直执行queue_中的任务,由于queue_是动态的,自然需要考虑queue_空了怎么办,LevelDB采用的是条件变量boost::condition_variable bgsignal_,队列空了就进入等待,直至有新的任务加入进来。而条件变量一般是要和boost::mutex mu_搭配使用,防止某些逻辑错误。


 // BGThread函数的包装,里面调用的就是BGThread函数
  static void* BGThreadWrapper(void* arg) {
    reinterpret_cast(arg)->BGThread();
    return NULL;
  }
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  boost::unique_locklock(mu_);

  // Start background thread if necessary
  if (!bgthread_) {
     bgthread_.reset(
         new boost::thread(boost::bind(&PosixEnv::BGThreadWrapper, this)));
  }

  // Add to priority queue
  // 将任务压进队列中
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;

  lock.unlock();

  bgsignal_.notify_one();

}
void PosixEnv::BGThread() {
  while (true) {
  // 加锁,防止并发冲突
  boost::unique_locklock(mu_);
  // 如果队列为空,等待,直到收到通知(notification)
  while (queue_.empty()) {
    bgsignal_.wait(lock);
  }
  // 从队列头取出任务的函数及其参数
  void (*function)(void*) = queue_.front().function;
  void* arg = queue_.front().arg;
  queue_.pop_front();

  lock.unlock();
  // 调用函数
  (*function)(arg);
  }
}

此外PosixEnv中还有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函数,他们见名知义,都是调用Boot的相应函数实现的。


EnvWrapper:

在levelDB中还实现了一个EnvWrapper类,该类继承自Env,且只有一个成员函数Env* target_,该类的所有变量都调用Env类相应的成员变量,我们知道,Env是一个抽象类,是不能定义Env 类型的对象的。我们传给EnvWrapper 的构造函数的类型是PosixEnv,所以,最后调用的都是PosixEnv类的成员变量,你可能已经猜到了,这就是设计模式中的代理模式,EnvWrapper只是进行了简单的封装,它的代理了Env的子类PosixEnv。
EnvWrapper和Env与PosixEnv的关系如下:


由于篇幅限制,Env中的Logger类就放在后面分析了,参考:LevelDB源码分析之十:LOG文件,从env给我的收获就是:

利用虚基类的特性提供了默认的实现,也开放了用户自定义操作的权限面向对象编程范式的学习,把一切操作定义成类文件的加锁解锁,线程的同步C的文件流操作,对文件名的字符提取操作,创建、删除文件和路径,这些都可以直接用到将来自己的项目中参考.



本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

9月2日消息,不造车的华为或将催生出更大的独角兽公司,随着阿维塔和赛力斯的入局,华为引望愈发显得引人瞩目。

关键字: 阿维塔 塞力斯 华为

加利福尼亚州圣克拉拉县2024年8月30日 /美通社/ -- 数字化转型技术解决方案公司Trianz今天宣布,该公司与Amazon Web Services (AWS)签订了...

关键字: AWS AN BSP 数字化

伦敦2024年8月29日 /美通社/ -- 英国汽车技术公司SODA.Auto推出其旗舰产品SODA V,这是全球首款涵盖汽车工程师从创意到认证的所有需求的工具,可用于创建软件定义汽车。 SODA V工具的开发耗时1.5...

关键字: 汽车 人工智能 智能驱动 BSP

北京2024年8月28日 /美通社/ -- 越来越多用户希望企业业务能7×24不间断运行,同时企业却面临越来越多业务中断的风险,如企业系统复杂性的增加,频繁的功能更新和发布等。如何确保业务连续性,提升韧性,成...

关键字: 亚马逊 解密 控制平面 BSP

8月30日消息,据媒体报道,腾讯和网易近期正在缩减他们对日本游戏市场的投资。

关键字: 腾讯 编码器 CPU

8月28日消息,今天上午,2024中国国际大数据产业博览会开幕式在贵阳举行,华为董事、质量流程IT总裁陶景文发表了演讲。

关键字: 华为 12nm EDA 半导体

8月28日消息,在2024中国国际大数据产业博览会上,华为常务董事、华为云CEO张平安发表演讲称,数字世界的话语权最终是由生态的繁荣决定的。

关键字: 华为 12nm 手机 卫星通信

要点: 有效应对环境变化,经营业绩稳中有升 落实提质增效举措,毛利润率延续升势 战略布局成效显著,战新业务引领增长 以科技创新为引领,提升企业核心竞争力 坚持高质量发展策略,塑强核心竞争优势...

关键字: 通信 BSP 电信运营商 数字经济

北京2024年8月27日 /美通社/ -- 8月21日,由中央广播电视总台与中国电影电视技术学会联合牵头组建的NVI技术创新联盟在BIRTV2024超高清全产业链发展研讨会上宣布正式成立。 活动现场 NVI技术创新联...

关键字: VI 传输协议 音频 BSP

北京2024年8月27日 /美通社/ -- 在8月23日举办的2024年长三角生态绿色一体化发展示范区联合招商会上,软通动力信息技术(集团)股份有限公司(以下简称"软通动力")与长三角投资(上海)有限...

关键字: BSP 信息技术
关闭
关闭