fuse_impls_read.c
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. #include "fuse_dfs.h"
  19. #include "fuse_impls.h"
  20. #include "fuse_file_handle.h"
  21. static size_t min(const size_t x, const size_t y) {
  22.   return x < y ? x : y;
  23. }
  24. /**
  25.  * dfs_read
  26.  *
  27.  * Reads from dfs or the open file's buffer.  Note that fuse requires that
  28.  * either the entire read be satisfied or the EOF is hit or direct_io is enabled
  29.  *
  30.  */
  31. int dfs_read(const char *path, char *buf, size_t size, off_t offset,
  32.                    struct fuse_file_info *fi)
  33. {
  34.   TRACE1("read",path)
  35.   
  36.   // retrieve dfs specific data
  37.   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  38.   // check params and the context var
  39.   assert(dfs);
  40.   assert(path);
  41.   assert(buf);
  42.   assert(offset >= 0);
  43.   assert(size >= 0);
  44.   assert(fi);
  45.   dfs_fh *fh = (dfs_fh*)fi->fh;
  46.   assert(fh != NULL);
  47.   assert(fh->fs != NULL);
  48.   assert(fh->hdfsFH != NULL);
  49.   // special case this as simplifies the rest of the logic to know the caller wanted > 0 bytes
  50.   if (size == 0)
  51.     return 0;
  52.   // If size is bigger than the read buffer, then just read right into the user supplied buffer
  53.   if ( size >= dfs->rdbuffer_size) {
  54.     int num_read;
  55.     size_t total_read = 0;
  56.     while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
  57.       total_read += num_read;
  58.     }
  59.     // if there was an error before satisfying the current read, this logic declares it an error
  60.     // and does not try to return any of the bytes read. Don't think it matters, so the code
  61.     // is just being conservative.
  62.     if (total_read < size && num_read < 0) {
  63.       total_read = -EIO;
  64.     }
  65.     return total_read;
  66.   }
  67.   //
  68.   // Critical section - protect from multiple reads in different threads accessing the read buffer
  69.   // (no returns until end)
  70.   //
  71.   pthread_mutex_lock(&fh->mutex);
  72.   // used only to check the postcondition of this function - namely that we satisfy
  73.   // the entire read or EOF is hit.
  74.   int isEOF = 0;
  75.   int ret = 0;
  76.   // check if the buffer is empty or
  77.   // the read starts before the buffer starts or
  78.   // the read ends after the buffer ends
  79.   if (fh->bufferSize == 0  || 
  80.       offset < fh->buffersStartOffset || 
  81.       offset + size > fh->buffersStartOffset + fh->bufferSize) 
  82.     {
  83.       // Read into the buffer from DFS
  84.       int num_read = 0;
  85.       size_t total_read = 0;
  86.       while (dfs->rdbuffer_size  - total_read > 0 &&
  87.              (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
  88.         total_read += num_read;
  89.       }
  90.       // if there was an error before satisfying the current read, this logic declares it an error
  91.       // and does not try to return any of the bytes read. Don't think it matters, so the code
  92.       // is just being conservative.
  93.       if (total_read < size && num_read < 0) {
  94.         // invalidate the buffer 
  95.         fh->bufferSize = 0; 
  96.         syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, (int)num_read, __FILE__, __LINE__);
  97.         ret = -EIO;
  98.       } else {
  99.         // Either EOF, all read or read beyond size, but then there was an error
  100.         fh->bufferSize = total_read;
  101.         fh->buffersStartOffset = offset;
  102.         if (dfs->rdbuffer_size - total_read > 0) {
  103.           // assert(num_read == 0); this should be true since if num_read < 0 handled above.
  104.           isEOF = 1;
  105.         }
  106.       }
  107.     }
  108.   //
  109.   // NOTE on EOF, fh->bufferSize == 0 and ret = 0 ,so the logic for copying data into the caller's buffer is bypassed, and
  110.   //  the code returns 0 as required
  111.   //
  112.   if (ret == 0 && fh->bufferSize > 0) {
  113.     assert(offset >= fh->buffersStartOffset);
  114.     assert(fh->buf);
  115.     const size_t bufferReadIndex = offset - fh->buffersStartOffset;
  116.     assert(bufferReadIndex >= 0 && bufferReadIndex < fh->bufferSize);
  117.     const size_t amount = min(fh->buffersStartOffset + fh->bufferSize - offset, size);
  118.     assert(amount >= 0 && amount <= fh->bufferSize);
  119.     const char *offsetPtr = fh->buf + bufferReadIndex;
  120.     assert(offsetPtr >= fh->buf);
  121.     assert(offsetPtr + amount <= fh->buf + fh->bufferSize);
  122.     
  123.     memcpy(buf, offsetPtr, amount);
  124.     ret = amount;
  125.   }
  126.   //
  127.   // Critical section end 
  128.   //
  129.   pthread_mutex_unlock(&fh->mutex);
  130.  
  131.   // fuse requires the below and the code should guarantee this assertion
  132.   // 3 cases on return:
  133.   //   1. entire read satisfied
  134.   //   2. partial read and isEOF - including 0 size read
  135.   //   3. error 
  136.   assert(ret == size || isEOF || ret < 0);
  137.  return ret;
  138. }