1.下载地址http://www.viewtier.com/downloads.htm2.sh -y ./paraxxxxxx.shparabuild配置备忘安装位置是/opt/parabuild链接位置是/usr/bin启动命令/etc/init.d/parabuild start永久授权licenseproduct=Vie
本篇博客主要讲解fastdfs中tracker协议的讲解。
fastdfs主要是存储文件,直接把整个文件存储到磁盘上,所以,简单直接。但是也有很大的局限性。
因此,fastdfs对文件的目录设置和存储是最为核心的。
为什么这么突然的讲解这些。因为我已经看了一段时间的fastdfs,主要结构都已经搞的比较清晰了。因此,这篇文章,我就主要一tracker这一部分的协议来分析。
其他具体介绍tracker的请百度。我就不介绍了,我就直接从
[cpp] view
plain copy

-
int tracker_deal_task(struct fast_task_info *pTask)
这个方法开始对每个case分析。
1、storage心跳协议
[cpp] view
plain copy

-
case TRACKER_PROTO_CMD_STORAGE_BEAT:
-
TRACKER_CHECK_LOGINED(pTask)
-
result = tracker_deal_storage_beat(pTask);
-
break;
自然,该协议是从storage层发送给tracker层的数据包,
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_BEAT 83 //storage heart beat
那么,storage主要是做了什么:
storage在启动的时候,会开启一个线程,该线程为
[cpp] view
plain copy

-
static void *tracker_report_thread_entrance(void *arg)
该函数主要是做了根据配置连接相应的它的组的tacker,做一些事情,这里有个while循环,代码如下
[cpp] view
plain copy

-
current_time = g_current_time;
-
if (current_time - last_beat_time >= \
-
g_heart_beat_interval)
-
{
-
if (tracker_heart_beat(pTrackerServer, \
-
&stat_chg_sync_count, \
-
&bServerPortChanged) != 0)
-
{
-
break;
-
}
也就是至少30秒钟来一次心跳,心跳包的主要数据是包头和当前storage的状态信息,
[cpp] view
plain copy

-
char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];
[cpp] view
plain copy

-
-
typedef struct
-
{
-
char sz_total_upload_count[8];
-
char sz_success_upload_count[8];
-
char sz_total_append_count[8];
-
char sz_success_append_count[8];
-
char sz_total_modify_count[8];
-
char sz_success_modify_count[8];
-
char sz_total_truncate_count[8];
-
char sz_success_truncate_count[8];
-
char sz_total_set_meta_count[8];
-
char sz_success_set_meta_count[8];
-
char sz_total_delete_count[8];
-
char sz_success_delete_count[8];
-
char sz_total_download_count[8];
-
char sz_success_download_count[8];
-
char sz_total_get_meta_count[8];
-
char sz_success_get_meta_count[8];
-
char sz_total_create_link_count[8];
-
char sz_success_create_link_count[8];
-
char sz_total_delete_link_count[8];
-
char sz_success_delete_link_count[8];
-
char sz_total_upload_bytes[8];
-
char sz_success_upload_bytes[8];
-
char sz_total_append_bytes[8];
-
char sz_success_append_bytes[8];
-
char sz_total_modify_bytes[8];
-
char sz_success_modify_bytes[8];
-
char sz_total_download_bytes[8];
-
char sz_success_download_bytes[8];
-
char sz_total_sync_in_bytes[8];
-
char sz_success_sync_in_bytes[8];
-
char sz_total_sync_out_bytes[8];
-
char sz_success_sync_out_bytes[8];
-
char sz_total_file_open_count[8];
-
char sz_success_file_open_count[8];
-
char sz_total_file_read_count[8];
-
char sz_success_file_read_count[8];
-
char sz_total_file_write_count[8];
-
char sz_success_file_write_count[8];
-
char sz_last_source_update[8];
-
char sz_last_sync_update[8];
-
char sz_last_synced_timestamp[8];
-
char sz_last_heart_beat_time[8];
-
} FDFSStorageStatBuff;
tracker主要是做了什么呢?
对其进行解包,然后对这个保存在本地的storage的信息进行保存到文件中,调用
[cpp] view
plain copy

-
status = tracker_save_storages();
调用
[cpp] view
plain copy

-
tracker_mem_active_store_server(pClientInfo->pGroup, \
-
pClientInfo->pStorage);
将这个存储服务器如果没有,就插入到group中。
最后调用
[cpp] view
plain copy

-
static int tracker_check_and_sync(struct fast_task_info *pTask, \
-
const int status)
检查相应的改变状态,并将其同步等。(需要再详细看看)
2、报告相应同步时间
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT 89 //report src last synced time as dest server
同样在storage的report线程执行
[cpp] view
plain copy

-
if (sync_time_chg_count != g_sync_change_count && \
-
current_time - last_sync_report_time >= \
-
g_heart_beat_interval)
-
{
-
if (tracker_report_sync_timestamp( \
-
pTrackerServer, &bServerPortChanged)!=0)
-
{
-
break;
-
}
-
-
sync_time_chg_count = g_sync_change_count;
-
last_sync_report_time = current_time;
-
}
具体的数据包为
[cpp] view
plain copy

-
pEnd = g_storage_servers + g_storage_count;
-
for (pServer=g_storage_servers; pServer<pEnd; pServer++)
-
{
-
memcpy(p, pServer->server.id, FDFS_STORAGE_ID_MAX_SIZE);
-
p += FDFS_STORAGE_ID_MAX_SIZE;
-
int2buff(pServer->last_sync_src_timestamp, p);
-
p += 4;
-
}
也就是遍历当前进程的本组所有storage服务器,和上次同步的时间戳,给tracker服务器。
然后tracker的服务器存储结构为
[cpp] view
plain copy

-
pClientInfo->pGroup->last_sync_timestamps \
-
[src_index][dest_index] = sync_timestamp;
dest_index 值为当前连接所在组的索引值
[cpp] view
plain copy

-
dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup,
-
pClientInfo->pStorage);
-
if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count)
-
{
-
status = 0;
-
break;
-
}
因为 本链接的storage是固定不变的,而src_index就是为本组的其他storage的id索引,
首相通过id,(ip地址)找到具体的storage,然后在通过指针找到索引位置,
最后,调用
[cpp] view
plain copy

-
if (++g_storage_sync_time_chg_count % \
-
TRACKER_SYNC_TO_FILE_FREQ == 0)
-
{
-
status = tracker_save_sync_timestamps();
-
}
-
else
-
{
-
status = 0;
-
}
-
} while (0);
-
-
return tracker_check_and_sync(pTask, status);
定时保存文件和检查等
3、上报磁盘情况
#define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84 //report disk usage
同样线程定时调用,
[cpp] view
plain copy

-
if (current_time - last_df_report_time >= \
-
g_stat_report_interval)
-
{
-
if (tracker_report_df_stat(pTrackerServer, \
-
&bServerPortChanged) != 0)
-
{
-
break;
-
}
-
-
last_df_report_time = current_time;
-
}
同样上报这些数据
[cpp] view
plain copy

-
for (i=0; i<g_fdfs_store_paths.count; i++)
-
{
-
if (statvfs(g_fdfs_store_paths.paths[i], &sbuf) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call statfs fail, errno: %d, error info: %s.",\
-
__LINE__, errno, STRERROR(errno));
-
-
if (pBuff != out_buff)
-
{
-
free(pBuff);
-
}
-
return errno != 0 errno : EACCES;
-
}
-
-
g_path_space_list[i].total_mb = ((int64_t)(sbuf.f_blocks) * \
-
sbuf.f_frsize) / FDFS_ONE_MB;
-
g_path_space_list[i].free_mb = ((int64_t)(sbuf.f_bavail) * \
-
sbuf.f_frsize) / FDFS_ONE_MB;
-
long2buff(g_path_space_list[i].total_mb, pStatBuff->sz_total_mb);
-
long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb);
-
-
pStatBuff++;
-
}
tracker这边存储在
int64_t *path_total_mbs; //total disk storage in MB
int64_t *path_free_mbs; //free disk storage in MB
这里
[cpp] view
plain copy

-
path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb);
-
path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb);
-
-
pClientInfo->pStorage->total_mb += path_total_mbs[i];
-
pClientInfo->pStorage->free_mb += path_free_mbs[i];
4、storage服加入到tracker
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_JOIN 81
storage线程同样在该处调用
[cpp] view
plain copy

-
if (tracker_report_join(pTrackerServer, tracker_index, \
-
sync_old_done) != 0)
-
{
-
sleep(g_heart_beat_interval);
-
continue;
-
}
发送的包体数据包为:
[cpp] view
plain copy

-
typedef struct
-
{
-
char group_name[FDFS_GROUP_NAME_MAX_LEN+1];
-
char storage_port[FDFS_PROTO_PKG_LEN_SIZE];
-
char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE];
-
char store_path_count[FDFS_PROTO_PKG_LEN_SIZE];
-
char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE];
-
char upload_priority[FDFS_PROTO_PKG_LEN_SIZE];
-
char join_time[FDFS_PROTO_PKG_LEN_SIZE];
-
char up_time[FDFS_PROTO_PKG_LEN_SIZE];
-
char version[FDFS_VERSION_SIZE];
-
char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
-
char init_flag;
-
signed char status;
-
char tracker_count[FDFS_PROTO_PKG_LEN_SIZE];
-
} TrackerStorageJoinBody;
当赋值完成后,在气候变加入
[cpp] view
plain copy

-
p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody);
-
pServerEnd = g_tracker_group.servers + g_tracker_group.server_count;
-
for (pServer=g_tracker_group.servers; pServer<pServerEnd; pServer++)
-
{
-
-
-
-
-
-
-
-
-
-
sprintf(p, "%s:%d", pServer->ip_addr, pServer->port);
-
p += FDFS_PROTO_IP_PORT_SIZE;
-
}
加入所有tracker的服务器信息格式为ip:port
tracker 服务器接收
[cpp] view
plain copy

-
case TRACKER_PROTO_CMD_STORAGE_JOIN:
-
result = tracker_deal_storage_join(pTask);
-
break;
获取到的相关信息存储到
[cpp] view
plain copy

-
typedef struct
-
{
-
int storage_port;
-
int storage_http_port;
-
int store_path_count;
-
int subdir_count_per_path;
-
int upload_priority;
-
int join_time;
-
int up_time;
-
char version[FDFS_VERSION_SIZE];
-
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
-
char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
-
char init_flag;
-
signed char status;
-
int tracker_count;
-
ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS];
-
} FDFSStorageJoinBody;
这些结构体内
同时插入本地内存
result = tracker_mem_add_group_and_storage(pClientInfo, \
pTask->client_ip, &joinBody, true);
同时把发消息报的id传过来
[cpp] view
plain copy

-
pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + \
-
sizeof(TrackerHeader));
-
memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp));
-
-
if (pClientInfo->pStorage->psync_src_server != NULL)
-
{
-
strcpy(pJoinBodyResp->src_id, \
-
pClientInfo->pStorage->psync_src_server->id);
-
}
5、报告存储状态
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS 76 //report specified storage server status
storage服务器调用
[cpp] view
plain copy

-
int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServer)
内容主要是组名字
[cpp] view
plain copy

-
strcpy(out_buff + sizeof(TrackerHeader), g_group_name);
和简要信息
[cpp] view
plain copy

-
memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, \
-
briefServer, sizeof(FDFSStorageBrief));
其结构体如下
[cpp] view
plain copy

-
typedef struct
-
{
-
char status;
-
char port[4];
-
char id[FDFS_STORAGE_ID_MAX_SIZE];
-
char ip_addr[IP_ADDRESS_SIZE];
-
} FDFSStorageBrief;
6、从tracker获取storage状态。
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS 71 //get storage status from tracker
该协议是由client发起
调用流程如下:
[cpp] view
plain copy

-
int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \
-
const char *group_name, const char *ip_addr, \
-
FDFSStorageBrief *pDestBuff)
-
int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, \
-
const char *group_name, const char *ip_addr, \
-
char *storage_id, int *status)
-
int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \
-
const char *group_name, const char *ip_addr, \
-
FDFSStorageBrief *pDestBuff)
获取自己的状态,
包体格式 组名 ip的字符串
tracker通过获取了相应的数据,查找到storage的信息
结构体为:
[cpp] view
plain copy

-
typedef struct
-
{
-
char status;
-
char port[4];
-
char id[FDFS_STORAGE_ID_MAX_SIZE];
-
char ip_addr[IP_ADDRESS_SIZE];
-
} FDFSStorageBrief;
赋值后,返回
7、通过tracker获取storageid
#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID
70 //get storage server id from tracker
和上以协议请求一样 groupname+ip 组成。
tracker处理方法
[cpp] view
plain copy

-
static int tracker_deal_get_storage_id(struct fast_task_info *pTask)
tracker最后通过
[cpp] view
plain copy

-
FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, \
-
const char *pIpAddr)
-
{
-
FDFSStorageIdInfo target;
-
memset(&target, 0, sizeof(FDFSStorageIdInfo));
-
snprintf(target.group_name, sizeof(target.group_name), "%s", group_name);
-
snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr);
-
return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, \
-
g_storage_id_count, sizeof(FDFSStorageIdInfo), \
-
fdfs_cmp_group_name_and_ip);
-
}
该方法获取了了
[cpp] view
plain copy

-
FDFSStorageIdInfo
信息,然后赋值,返回。
8、通过tracker获取所有storage服务器
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69 //get all storage ids from tracker
[cpp] view
plain copy

-
for (i=0; i<5; i++)
-
{
-
for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++)
-
{
-
memcpy(pTServer, pGServer, sizeof(ConnectionInfo));
-
pTServer->sock = -1;
-
result = fdfs_get_storage_ids_from_tracker_server(pTServer);
-
if (result == 0)
-
{
-
return result;
-
}
-
}
-
-
if (pServerStart != pTrackerGroup->servers)
-
{
-
pServerStart = pTrackerGroup->servers;
-
}
-
sleep(1);
-
}
调用顺序
[cpp] view
plain copy

-
int storage_func_init(const char *filename, char *bind_addr, const int addr_size)
-
int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)
-
int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
tracker函数,每秒钟中调用,遍历所有的trackers服务器
tracker服务器获取
[cpp] view
plain copy

-
case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS:
-
result = tracker_deal_fetch_storage_ids(pTask);
-
break;
然后通过这种协议格式
[cpp] view
plain copy

-
</pre></p><p>返回的数据</p><p></p><pre name="code" class="cpp">pIdsStart = g_storage_ids_by_ip + start_index;
-
pIdsEnd = g_storage_ids_by_ip + g_storage_id_count;
-
for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++)
-
{
-
if ((int)(p - pTask->data) > pTask->size - 64)
-
{
-
break;
-
}
-
-
p += sprintf(p, "%s %s %s\n", pIdInfo->id, \
-
pIdInfo->group_name, pIdInfo->ip_addr);
-
}
返回给请求者。
9、回复给新的storage
[cpp] view
plain copy

-
#define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG 85 //repl new storage servers
storage服务器调用流程:
剩下的协议
[cpp] view
plain copy

-
static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \
-
FDFSStorageBrief *briefServers, const int server_count)
Copyright ©
版权所有 本站资源均来自互联网或会员发布,如果侵犯了您的权益请与我们联系,我们将在24小时内删除!谢谢!