iSulad/src/connect/client/grpc/grpc_containers_client.cc

2234 lines
74 KiB
C++
Raw Normal View History

2019-09-30 10:53:41 -04:00
/******************************************************************************
* Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved.
* iSulad licensed under the Mulan PSL v1.
* You can use this software according to the terms and conditions of the Mulan PSL v1.
* You may obtain a copy of Mulan PSL v1 at:
* http://license.coscl.org.cn/MulanPSL
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v1 for more details.
* Author: lifeng
* Create: 2018-11-08
* Description: provide grpc containers client functions
******************************************************************************/
#include "grpc_containers_client.h"
#include <string>
#include <memory>
#include <sstream>
#include <fstream>
#include <thread>
#include "container_copy_to_request.h"
#include "container_exec_request.h"
#include "utils.h"
#include "lcrdtar.h"
#include "stoppable_thread.h"
#include "container.grpc.pb.h"
#include "client_base.h"
#include "pack_config.h"
using namespace containers;
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::ClientReaderWriter;
using grpc::ClientWriter;
using grpc::Status;
using grpc::StatusCode;
using google::protobuf::Timestamp;
class ContainerVersion : public ClientBase<ContainerService, ContainerService::Stub, lcrc_version_request,
VersionRequest, lcrc_version_response, VersionResponse> {
public:
explicit ContainerVersion(void *args)
: ClientBase(args)
{
}
~ContainerVersion() = default;
int response_from_grpc(VersionResponse *gresponse, lcrc_version_response *response) override
{
if (!gresponse->version().empty()) {
response->version = util_strdup_s(gresponse->version().c_str());
}
if (!gresponse->git_commit().empty()) {
response->git_commit = util_strdup_s(gresponse->git_commit().c_str());
}
if (!gresponse->build_time().empty()) {
response->build_time = util_strdup_s(gresponse->build_time().c_str());
}
if (!gresponse->root_path().empty()) {
response->root_path = util_strdup_s(gresponse->root_path().c_str());
}
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
response->server_errono = gresponse->cc();
return 0;
}
Status grpc_call(ClientContext *context, const VersionRequest &req, VersionResponse *reply) override
{
return stub_->Version(context, req, reply);
}
};
class ContainerInfo : public ClientBase<ContainerService, ContainerService::Stub, lcrc_info_request, InfoRequest,
lcrc_info_response, InfoResponse> {
public:
explicit ContainerInfo(void *args)
: ClientBase(args)
{
}
~ContainerInfo() = default;
int response_from_grpc(InfoResponse *gresponse, lcrc_info_response *response) override
{
if (!gresponse->version().empty()) {
response->version = util_strdup_s(gresponse->version().c_str());
}
response->containers_num = gresponse->containers_num();
response->c_running = gresponse->c_running();
response->c_paused = gresponse->c_paused();
response->c_stopped = gresponse->c_stopped();
response->images_num = gresponse->images_num();
get_os_info_from_grpc(response, gresponse);
if (!gresponse->logging_driver().empty()) {
response->logging_driver = util_strdup_s(gresponse->logging_driver().c_str());
}
if (!gresponse->huge_page_size().empty()) {
response->huge_page_size = util_strdup_s(gresponse->huge_page_size().c_str());
}
if (!gresponse->isulad_root_dir().empty()) {
response->isulad_root_dir = util_strdup_s(gresponse->isulad_root_dir().c_str());
}
response->total_mem = gresponse->total_mem();
get_proxy_info_from_grpc(response, gresponse);
return 0;
}
Status grpc_call(ClientContext *context, const InfoRequest &req, InfoResponse *reply) override
{
return stub_->Info(context, req, reply);
}
private:
void get_os_info_from_grpc(lcrc_info_response *response, InfoResponse *gresponse)
{
if (!gresponse->kversion().empty()) {
response->kversion = util_strdup_s(gresponse->kversion().c_str());
}
if (!gresponse->os_type().empty()) {
response->os_type = util_strdup_s(gresponse->os_type().c_str());
}
if (!gresponse->architecture().empty()) {
response->architecture = util_strdup_s(gresponse->architecture().c_str());
}
if (!gresponse->nodename().empty()) {
response->nodename = util_strdup_s(gresponse->nodename().c_str());
}
response->cpus = gresponse->cpus();
if (!gresponse->operating_system().empty()) {
response->operating_system = util_strdup_s(gresponse->operating_system().c_str());
}
if (!gresponse->cgroup_driver().empty()) {
response->cgroup_driver = util_strdup_s(gresponse->cgroup_driver().c_str());
}
}
void get_proxy_info_from_grpc(lcrc_info_response *response, InfoResponse *gresponse)
{
if (!gresponse->http_proxy().empty()) {
response->http_proxy = util_strdup_s(gresponse->http_proxy().c_str());
}
if (!gresponse->https_proxy().empty()) {
response->https_proxy = util_strdup_s(gresponse->https_proxy().c_str());
}
if (!gresponse->no_proxy().empty()) {
response->no_proxy = util_strdup_s(gresponse->no_proxy().c_str());
}
}
};
class ContainerCreate : public ClientBase<ContainerService, ContainerService::Stub, lcrc_create_request, CreateRequest,
lcrc_create_response, CreateResponse> {
public:
explicit ContainerCreate(void *args)
: ClientBase(args)
{
}
~ContainerCreate() = default;
int request_to_grpc(const lcrc_create_request *request, CreateRequest *grequest) override
{
int ret = 0;
char *host_json = nullptr, *config_json = nullptr;
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
if (request->rootfs != nullptr) {
grequest->set_rootfs(request->rootfs);
}
if (request->image != nullptr) {
grequest->set_image(request->image);
}
if (request->runtime != nullptr) {
grequest->set_runtime(request->runtime);
}
ret = generate_hostconfig(request->hostconfig, &host_json);
if (ret) {
ERROR("Failed to pack host config");
return EINVALIDARGS;
}
grequest->set_hostconfig(host_json);
free(host_json);
ret = generate_container_config(request->config, &config_json);
if (ret) {
ERROR("Failed to pack custom config");
return EINVALIDARGS;
}
grequest->set_customconfig(config_json);
free(config_json);
return 0;
}
int response_from_grpc(CreateResponse *gresponse, lcrc_create_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
if (!gresponse->id().empty()) {
response->id = util_strdup_s(gresponse->id().c_str());
}
return 0;
}
int check_parameter(const CreateRequest &req) override
{
int nret = -1;
if (req.runtime().empty()) {
ERROR("Missing runtime in the request");
return nret;
}
if (req.rootfs().empty() && req.image().empty()) {
ERROR("Missing container rootfs or image arguments in the request");
return nret;
}
if (req.hostconfig().empty()) {
ERROR("Missing hostconfig in the request");
return nret;
}
if (req.customconfig().empty()) {
ERROR("Missing customconfig in the request");
return nret;
}
return 0;
}
Status grpc_call(ClientContext *context, const CreateRequest &req, CreateResponse *reply) override
{
return stub_->Create(context, req, reply);
}
};
class ContainerStart : public ClientBase<ContainerService, ContainerService::Stub, lcrc_start_request, StartRequest,
lcrc_start_response, StartResponse> {
public:
explicit ContainerStart(void *args)
: ClientBase(args)
{
}
~ContainerStart() = default;
int request_to_grpc(const lcrc_start_request *request, StartRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
if (request->stdin != nullptr) {
grequest->set_stdin(request->stdin);
}
if (request->stdout != nullptr) {
grequest->set_stdout(request->stdout);
}
if (request->stderr != nullptr) {
grequest->set_stderr(request->stderr);
}
grequest->set_attach_stdin(request->attach_stdin);
grequest->set_attach_stdout(request->attach_stdout);
grequest->set_attach_stderr(request->attach_stderr);
return 0;
}
int response_from_grpc(StartResponse *gresponse, struct lcrc_start_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const StartRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const StartRequest &req, StartResponse *reply) override
{
return stub_->Start(context, req, reply);
}
};
class RemoteStartWriteToServerTask : public StoppableThread {
public:
explicit RemoteStartWriteToServerTask(
std::shared_ptr<ClientReaderWriter<RemoteStartRequest, RemoteStartResponse>> stream)
: m_stream(stream)
{
}
~RemoteStartWriteToServerTask() = default;
void run()
{
while (stopRequested() == false) {
int cmd;
cmd = getchar();
RemoteStartRequest request;
if (cmd == EOF) {
request.set_finish(true);
} else {
char in = (char)cmd;
request.set_stdin(&in, 1);
}
if (!m_stream->Write(request)) {
ERROR("Failed to write request to grpc server");
break;
}
if (cmd == EOF) {
break;
}
}
}
private:
std::shared_ptr<ClientReaderWriter<RemoteStartRequest, RemoteStartResponse>> m_stream;
};
class ContainerRemoteStart : public ClientBase<ContainerService, ContainerService::Stub, lcrc_start_request,
RemoteStartRequest, lcrc_start_response, RemoteStartResponse> {
public:
explicit ContainerRemoteStart(void *args)
: ClientBase(args)
{
}
~ContainerRemoteStart() = default;
int set_custom_header_metadata(ClientContext &context, const struct lcrc_start_request *request)
{
if (request == nullptr || request->name == nullptr) {
ERROR("Missing container id in the request");
return -1;
}
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
int ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret != 0) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
return -1;
}
context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
context.AddMetadata("tls_mode", m_tlsMode);
context.AddMetadata("container-id", std::string(request->name));
context.AddMetadata("attach-stdin", request->attach_stdin ? "true" : "false");
context.AddMetadata("attach-stdout", request->attach_stdout ? "true" : "false");
context.AddMetadata("attach-stderr", request->attach_stderr ? "true" : "false");
return 0;
}
void get_server_trailing_metadata(ClientContext &context, lcrc_start_response *response)
{
auto metadata = context.GetServerTrailingMetadata();
auto cc = metadata.find("cc");
if (cc != metadata.end()) {
auto tmpstr = std::string(cc->second.data(), cc->second.length());
response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0);
}
auto errmsg = metadata.find("errmsg");
if (errmsg != metadata.end()) {
auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length());
response->errmsg = util_strdup_s(tmpstr.c_str());
}
}
int run(const struct lcrc_start_request *request, struct lcrc_start_response *response) override
{
ClientContext context;
if (set_custom_header_metadata(context, request) != 0) {
ERROR("Failed to translate request to grpc");
response->cc = LCRD_ERR_INPUT;
return -1;
}
using StreamStartRWSharedPtr = std::shared_ptr<ClientReaderWriter<RemoteStartRequest, RemoteStartResponse>>;
StreamStartRWSharedPtr stream(stub_->RemoteStart(&context));
RemoteStartWriteToServerTask write_task(stream);
std::thread writer;
if (request->attach_stdin) {
writer = std::thread([&]() {
write_task.run();
});
}
RemoteStartResponse stream_response;
if (request->attach_stdout || request->attach_stderr) {
while (stream->Read(&stream_response)) {
if (stream_response.finish()) {
break;
}
if (!stream_response.stdout().empty()) {
std::cout << stream_response.stdout() << std::flush;
}
if (!stream_response.stderr().empty()) {
std::cerr << stream_response.stderr() << std::flush;
}
}
}
write_task.stop();
stream->WritesDone();
Status status = stream->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
goto out;
}
get_server_trailing_metadata(context, response);
if (response->server_errono != LCRD_SUCCESS) {
response->cc = LCRD_ERR_EXEC;
goto out;
}
out:
if (request->attach_stdin) {
pthread_cancel(writer.native_handle());
if (writer.joinable()) {
writer.join();
}
}
return (response->cc == LCRD_SUCCESS) ? 0 : -1;
}
};
class ContainerTop : public ClientBase<ContainerService, ContainerService::Stub, lcrc_top_request, TopRequest,
lcrc_top_response, TopResponse> {
public:
explicit ContainerTop(void *args)
: ClientBase(args)
{
}
~ContainerTop() = default;
int request_to_grpc(const lcrc_top_request *request, TopRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
if (request->ps_argc > 0) {
for (int i = 0; i < request->ps_argc; i++) {
grequest->add_args(request->ps_args[i]);
}
}
return 0;
}
int response_from_grpc(TopResponse *gresponse, struct lcrc_top_response *response) override
{
int i = 0;
int num = gresponse->processes_size();
if (num <= 0) {
response->titles = nullptr;
response->processes_len = 0;
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
if (!gresponse->titles().empty()) {
response->titles = util_strdup_s(gresponse->titles().c_str());
}
if ((size_t)num > SIZE_MAX / sizeof(char *)) {
ERROR("Too many summary info!");
return -1;
}
response->processes = (char **)util_common_calloc_s(num * sizeof(char *));
if (response->processes == nullptr) {
ERROR("out of memory");
response->cc = LCRD_ERR_MEMOUT;
return -1;
}
for (i = 0; i < num; i++) {
response->processes[i] = util_strdup_s(gresponse->processes(i).c_str());
}
response->processes_len = (size_t)gresponse->processes_size();
return 0;
}
int check_parameter(const TopRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const TopRequest &req, TopResponse *reply) override
{
return stub_->Top(context, req, reply);
}
};
class ContainerStop : public ClientBase<ContainerService, ContainerService::Stub, lcrc_stop_request, StopRequest,
lcrc_stop_response, StopResponse> {
public:
explicit ContainerStop(void *args)
: ClientBase(args)
{
}
~ContainerStop() = default;
int request_to_grpc(const lcrc_stop_request *request, StopRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
grequest->set_force(request->force);
grequest->set_timeout(request->timeout);
return 0;
}
int response_from_grpc(StopResponse *gresponse, lcrc_stop_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const StopRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const StopRequest &req, StopResponse *reply) override
{
return stub_->Stop(context, req, reply);
}
};
class ContainerRename : public ClientBase<ContainerService, ContainerService::Stub, lcrc_rename_request, RenameRequest,
lcrc_rename_response, RenameResponse> {
public:
explicit ContainerRename(void *args)
: ClientBase(args)
{
}
~ContainerRename() = default;
int request_to_grpc(const lcrc_rename_request *request, RenameRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->old_name != nullptr) {
grequest->set_oldname(request->old_name);
}
if (request->new_name != nullptr) {
grequest->set_newname(request->new_name);
}
return 0;
}
int response_from_grpc(RenameResponse *gresponse, lcrc_rename_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const RenameRequest &req) override
{
if (req.oldname().empty()) {
ERROR("Missing container old name in the request");
return -1;
}
if (req.newname().empty()) {
ERROR("Missing container new name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const RenameRequest &req, RenameResponse *reply) override
{
return stub_->Rename(context, req, reply);
}
};
class ContainerRestart : public ClientBase<ContainerService, ContainerService::Stub, lcrc_restart_request,
RestartRequest, lcrc_restart_response, RestartResponse> {
public:
explicit ContainerRestart(void *args)
: ClientBase(args)
{
}
~ContainerRestart() = default;
int request_to_grpc(const lcrc_restart_request *request, RestartRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
grequest->set_timeout((int32_t)(request->timeout));
return 0;
}
int response_from_grpc(RestartResponse *gresponse, lcrc_restart_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const RestartRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const RestartRequest &req, RestartResponse *reply) override
{
return stub_->Restart(context, req, reply);
}
};
class ContainerKill : public ClientBase<ContainerService, ContainerService::Stub, lcrc_kill_request, KillRequest,
lcrc_kill_response, KillResponse> {
public:
explicit ContainerKill(void *args)
: ClientBase(args)
{
}
~ContainerKill() = default;
int request_to_grpc(const lcrc_kill_request *request, KillRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
grequest->set_signal(request->signal);
return 0;
}
int response_from_grpc(KillResponse *gresponse, lcrc_kill_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const KillRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const KillRequest &req, KillResponse *reply) override
{
return stub_->Kill(context, req, reply);
}
};
class ContainerExec : public ClientBase<ContainerService, ContainerService::Stub, lcrc_exec_request, ExecRequest,
lcrc_exec_response, ExecResponse> {
public:
explicit ContainerExec(void *args)
: ClientBase(args)
{
}
~ContainerExec() = default;
int request_to_grpc(const lcrc_exec_request *request, ExecRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_container_id(request->name);
}
grequest->set_tty(request->tty);
grequest->set_open_stdin(request->open_stdin);
grequest->set_attach_stdin(request->attach_stdin);
grequest->set_attach_stdout(request->attach_stdout);
grequest->set_attach_stderr(request->attach_stderr);
if (request->stdin != nullptr) {
grequest->set_stdin(request->stdin);
}
if (request->stdout != nullptr) {
grequest->set_stdout(request->stdout);
}
if (request->stderr != nullptr) {
grequest->set_stderr(request->stderr);
}
for (int i = 0; i < request->argc; i++) {
grequest->add_argv(request->argv[i]);
}
for (size_t i = 0; i < request->env_len; i++) {
grequest->add_env(request->env[i]);
}
2019-12-25 15:50:34 +08:00
if (request->user != nullptr) {
grequest->set_user(request->user);
}
2019-09-30 10:53:41 -04:00
return 0;
}
int response_from_grpc(ExecResponse *gresponse, lcrc_exec_response *response) override
{
response->server_errono = gresponse->cc();
response->exit_code = gresponse->exit_code();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const ExecRequest &req) override
{
if (req.container_id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const ExecRequest &req, ExecResponse *reply) override
{
return stub_->Exec(context, req, reply);
}
};
class RemoteExecWriteToServerTask : public StoppableThread {
public:
explicit RemoteExecWriteToServerTask(
std::shared_ptr<ClientReaderWriter<RemoteExecRequest, RemoteExecResponse>> stream)
: m_stream(stream)
{
}
~RemoteExecWriteToServerTask() = default;
void run()
{
while (stopRequested() == false) {
int cmd;
cmd = getchar();
RemoteExecRequest request;
if (cmd == EOF) {
request.set_finish(true);
} else {
char in = (char)cmd;
request.add_cmd(&in, 1);
}
if (!m_stream->Write(request)) {
ERROR("Failed to write request to grpc server");
break;
}
if (cmd == EOF) {
break;
}
}
}
private:
std::shared_ptr<ClientReaderWriter<RemoteExecRequest, RemoteExecResponse>> m_stream;
};
class ContainerRemoteExec : public ClientBase<ContainerService, ContainerService::Stub, lcrc_exec_request,
RemoteExecRequest, lcrc_exec_response, RemoteExecResponse> {
public:
explicit ContainerRemoteExec(void *args)
: ClientBase(args)
{
}
~ContainerRemoteExec() = default;
int set_custom_header_metadata(ClientContext &context, const struct lcrc_exec_request *request,
struct lcrc_exec_response *response)
{
int ret = 0;
char *json = nullptr;
parser_error err = nullptr;
container_exec_request exec = { 0 };
struct parser_context ctx = { OPT_GEN_SIMPLIFY, 0 };
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
if (request == nullptr || request->name == nullptr) {
ERROR("Missing container id in the request");
return -1;
}
exec.container_id = request->name;
exec.tty = request->tty;
exec.attach_stdin = request->attach_stdin;
exec.attach_stdout = request->attach_stdout;
exec.attach_stderr = request->attach_stderr;
exec.timeout = request->timeout;
exec.argv = request->argv;
exec.argv_len = (size_t)request->argc;
exec.env = request->env;
exec.env_len = request->env_len;
json = container_exec_request_generate_json(&exec, &ctx, &err);
if (json == nullptr) {
format_errorf(&response->errmsg, "Can not generate json: %s", err);
ret = -1;
goto out;
}
ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret != 0) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
ret = -1;
goto out;
}
context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
context.AddMetadata("tls_mode", m_tlsMode);
context.AddMetadata("isulad-remote-exec", json);
out:
free(err);
free(json);
return ret;
}
void get_server_trailing_metadata(ClientContext &context, lcrc_exec_response *response)
{
auto metadata = context.GetServerTrailingMetadata();
auto cc = metadata.find("cc");
if (cc != metadata.end()) {
auto tmpstr = std::string(cc->second.data(), cc->second.length());
response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0);
}
auto exit_code = metadata.find("exit_code");
if (exit_code != metadata.end()) {
auto tmpstr = std::string(exit_code->second.data(), exit_code->second.length());
response->exit_code = (uint32_t)std::stoul(tmpstr, nullptr, 0);
}
auto errmsg = metadata.find("errmsg");
if (errmsg != metadata.end()) {
auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length());
response->errmsg = util_strdup_s(tmpstr.c_str());
}
}
int run(const struct lcrc_exec_request *request, struct lcrc_exec_response *response) override
{
ClientContext context;
if (set_custom_header_metadata(context, request, response) != 0) {
ERROR("Failed to translate request to grpc");
response->cc = LCRD_ERR_INPUT;
return -1;
}
std::shared_ptr<ClientReaderWriter<RemoteExecRequest, RemoteExecResponse>> stream(stub_->RemoteExec(&context));
RemoteExecWriteToServerTask write_task(stream);
std::thread writer([&]() {
write_task.run();
});
RemoteExecResponse stream_response;
while (stream->Read(&stream_response)) {
if (stream_response.finish()) {
break;
}
std::cout << stream_response.stdout() << std::flush;
}
write_task.stop();
stream->WritesDone();
Status status = stream->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
goto out;
}
get_server_trailing_metadata(context, response);
if (response->server_errono != LCRD_SUCCESS) {
response->cc = LCRD_ERR_EXEC;
goto out;
}
out:
pthread_cancel(writer.native_handle());
if (writer.joinable()) {
writer.join();
}
return (response->cc == LCRD_SUCCESS) ? 0 : -1;
}
};
class ContainerInspect : public ClientBase<ContainerService, ContainerService::Stub, lcrc_inspect_request,
InspectContainerRequest, lcrc_inspect_response, InspectContainerResponse> {
public:
explicit ContainerInspect(void *args)
: ClientBase(args)
{
}
~ContainerInspect() = default;
int request_to_grpc(const lcrc_inspect_request *request, InspectContainerRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
grequest->set_bformat(request->bformat);
grequest->set_timeout(request->timeout);
return 0;
}
int response_from_grpc(InspectContainerResponse *gresponse, lcrc_inspect_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->containerjson().empty()) {
response->json = util_strdup_s(gresponse->containerjson().c_str());
}
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const InspectContainerRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const InspectContainerRequest &req,
InspectContainerResponse *reply) override
{
return stub_->Inspect(context, req, reply);
}
};
class ContainerDelete : public ClientBase<ContainerService, ContainerService::Stub, lcrc_delete_request, DeleteRequest,
lcrc_delete_response, DeleteResponse> {
public:
explicit ContainerDelete(void *args)
: ClientBase(args)
{
}
~ContainerDelete() = default;
int request_to_grpc(const lcrc_delete_request *request, DeleteRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
grequest->set_force(request->force);
return 0;
}
int response_from_grpc(DeleteResponse *gresponse, lcrc_delete_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->id().empty()) {
response->name = util_strdup_s(gresponse->id().c_str());
}
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const DeleteRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const DeleteRequest &req, DeleteResponse *reply) override
{
return stub_->Delete(context, req, reply);
}
};
class ContainerList : public ClientBase<ContainerService, ContainerService::Stub, lcrc_list_request, ListRequest,
lcrc_list_response, ListResponse> {
public:
explicit ContainerList(void *args)
: ClientBase(args)
{
}
~ContainerList() = default;
int request_to_grpc(const lcrc_list_request *request, ListRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->filters != nullptr) {
google::protobuf::Map<std::string, std::string> *map;
map = grequest->mutable_filters();
for (size_t i = 0; i < request->filters->len; i++) {
(*map)[request->filters->keys[i]] = request->filters->values[i];
}
}
grequest->set_all(request->all);
return 0;
}
int response_from_grpc(ListResponse *gresponse, lcrc_list_response *response) override
{
int i = 0;
int num = gresponse->containers_size();
if (num <= 0) {
response->container_summary = nullptr;
response->container_num = 0;
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
if ((size_t)num > SIZE_MAX / sizeof(lcrc_container_summary_info *)) {
ERROR("Too many summary info!");
return -1;
}
response->container_summary = (struct lcrc_container_summary_info **)util_common_calloc_s(
sizeof(struct lcrc_container_summary_info *) * (size_t)num);
if (response->container_summary == nullptr) {
ERROR("out of memory");
response->cc = LCRD_ERR_MEMOUT;
return -1;
}
for (i = 0; i < num; i++) {
if (get_container_summary_from_grpc(response, gresponse, i)) {
return -1;
}
}
return 0;
}
Status grpc_call(ClientContext *context, const ListRequest &req, ListResponse *reply) override
{
return stub_->List(context, req, reply);
}
private:
int get_container_summary_from_grpc(lcrc_list_response *response, ListResponse *gresponse, int index)
{
response->container_summary[index] =
(struct lcrc_container_summary_info *)util_common_calloc_s(sizeof(struct lcrc_container_summary_info));
if (response->container_summary[index] == nullptr) {
ERROR("out of memory");
response->cc = LCRD_ERR_MEMOUT;
return -1;
}
const Container &in = gresponse->containers(index);
const char *id = !in.id().empty() ? in.id().c_str() : "-";
response->container_summary[index]->id = util_strdup_s(id);
const char *name = !in.name().empty() ? in.name().c_str() : "-";
response->container_summary[index]->name = util_strdup_s(name);
response->container_summary[index]->runtime = !in.runtime().empty() ? util_strdup_s(in.runtime().c_str())
: nullptr;
response->container_summary[index]->has_pid = (int)in.pid() != 0;
response->container_summary[index]->pid = (uint32_t)in.pid();
response->container_summary[index]->status = (Container_Status)in.status();
response->container_summary[index]->image = !in.image().empty() ? util_strdup_s(in.image().c_str())
: util_strdup_s("none");
response->container_summary[index]->command = !in.command().empty() ? util_strdup_s(in.command().c_str())
: util_strdup_s("-");
const char *starttime = !in.startat().empty() ? in.startat().c_str() : "-";
response->container_summary[index]->startat = util_strdup_s(starttime);
const char *finishtime = !in.finishat().empty() ? in.finishat().c_str() : "-";
response->container_summary[index]->finishat = util_strdup_s(finishtime);
response->container_summary[index]->exit_code = in.exit_code();
response->container_summary[index]->restart_count = (uint32_t)(in.restartcount());
2019-12-25 15:50:34 +08:00
response->container_summary[index]->created = (int64_t)in.created();
2019-09-30 10:53:41 -04:00
std::string healthState { "" };
if (!in.health_state().empty()) {
healthState = "(" + in.health_state() + ")";
}
response->container_summary[index]->health_state = !healthState.empty() ? util_strdup_s(healthState.c_str())
: nullptr;
response->container_num++;
return 0;
}
};
class ContainerWait : public ClientBase<ContainerService, ContainerService::Stub, lcrc_wait_request, WaitRequest,
lcrc_wait_response, WaitResponse> {
public:
explicit ContainerWait(void *args)
: ClientBase(args)
{
}
~ContainerWait() = default;
int request_to_grpc(const lcrc_wait_request *request, WaitRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->id != nullptr) {
grequest->set_id(request->id);
}
grequest->set_condition(request->condition);
return 0;
}
int response_from_grpc(WaitResponse *gresponse, lcrc_wait_response *response) override
{
response->exit_code = (int)gresponse->exit_code();
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const WaitRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const WaitRequest &req, WaitResponse *reply) override
{
return stub_->Wait(context, req, reply);
}
};
class AttachWriteToServerTask : public StoppableThread {
public:
explicit AttachWriteToServerTask(std::shared_ptr<ClientReaderWriter<AttachRequest, AttachResponse>> stream)
: m_stream(stream)
{
}
~AttachWriteToServerTask() = default;
void run()
{
while (stopRequested() == false) {
int cmd;
cmd = getchar();
AttachRequest request;
if (cmd == EOF) {
request.set_finish(true);
} else {
char in = (char)cmd;
request.set_stdin(&in, 1);
}
if (!m_stream->Write(request)) {
ERROR("Failed to write request to grpc server");
break;
}
if (cmd == EOF) {
break;
}
}
}
private:
std::shared_ptr<ClientReaderWriter<AttachRequest, AttachResponse>> m_stream;
};
class ContainerAttach : public ClientBase<ContainerService, ContainerService::Stub, lcrc_attach_request, AttachRequest,
lcrc_attach_response, AttachResponse> {
public:
explicit ContainerAttach(void *args)
: ClientBase(args)
{
}
~ContainerAttach() = default;
int set_custom_header_metadata(ClientContext &context, const struct lcrc_attach_request *request)
{
if (request == nullptr || request->name == nullptr) {
ERROR("Missing container id in the request");
return -1;
}
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
int ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret != 0) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
return -1;
}
context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
context.AddMetadata("tls_mode", m_tlsMode);
context.AddMetadata("container-id", std::string(request->name));
context.AddMetadata("attach-stdin", request->attach_stdin ? "true" : "false");
context.AddMetadata("attach-stdout", request->attach_stdout ? "true" : "false");
context.AddMetadata("attach-stderr", request->attach_stderr ? "true" : "false");
return 0;
}
void get_server_trailing_metadata(ClientContext &context, lcrc_attach_response *response)
{
auto metadata = context.GetServerTrailingMetadata();
auto cc = metadata.find("cc");
if (cc != metadata.end()) {
auto tmpstr = std::string(cc->second.data(), cc->second.length());
response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0);
}
auto errmsg = metadata.find("errmsg");
if (errmsg != metadata.end()) {
auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length());
response->errmsg = util_strdup_s(tmpstr.c_str());
}
}
int run(const struct lcrc_attach_request *request, struct lcrc_attach_response *response) override
{
ClientContext context;
if (set_custom_header_metadata(context, request) != 0) {
ERROR("Failed to translate request to grpc");
response->cc = LCRD_ERR_INPUT;
return -1;
}
std::shared_ptr<ClientReaderWriter<AttachRequest, AttachResponse>> stream(stub_->Attach(&context));
AttachWriteToServerTask write_task(stream);
std::thread writer([&]() {
write_task.run();
});
if (request->attach_stdin) {
AttachResponse stream_response;
while (stream->Read(&stream_response)) {
if (stream_response.finish()) {
break;
}
if (!stream_response.stdout().empty()) {
std::cout << stream_response.stdout() << std::flush;
}
if (!stream_response.stderr().empty()) {
std::cerr << stream_response.stderr() << std::flush;
}
}
}
write_task.stop();
stream->WritesDone();
Status status = stream->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
goto out;
}
get_server_trailing_metadata(context, response);
if (response->server_errono != LCRD_SUCCESS) {
response->cc = LCRD_ERR_EXEC;
}
out:
if (request->attach_stdin) {
pthread_cancel(writer.native_handle());
if (writer.joinable()) {
writer.join();
}
}
return (response->cc == LCRD_SUCCESS) ? 0 : -1;
}
};
class ContainerPause : public ClientBase<ContainerService, ContainerService::Stub, lcrc_pause_request, PauseRequest,
lcrc_pause_response, PauseResponse> {
public:
explicit ContainerPause(void *args)
: ClientBase(args)
{
}
~ContainerPause() = default;
int request_to_grpc(const lcrc_pause_request *request, PauseRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
return 0;
}
int response_from_grpc(PauseResponse *gresponse, lcrc_pause_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const PauseRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const PauseRequest &req, PauseResponse *reply) override
{
return stub_->Pause(context, req, reply);
}
};
class ContainerResume : public ClientBase<ContainerService, ContainerService::Stub, lcrc_resume_request, ResumeRequest,
lcrc_resume_response, ResumeResponse> {
public:
explicit ContainerResume(void *args)
: ClientBase(args)
{
}
~ContainerResume() = default;
int request_to_grpc(const lcrc_resume_request *request, ResumeRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
return 0;
}
int response_from_grpc(ResumeResponse *gresponse, lcrc_resume_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const ResumeRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const ResumeRequest &req, ResumeResponse *reply) override
{
return stub_->Resume(context, req, reply);
}
};
class ContainerExport : public ClientBase<ContainerService, ContainerService::Stub, lcrc_export_request, ExportRequest,
lcrc_export_response, ExportResponse> {
public:
explicit ContainerExport(void *args)
: ClientBase(args)
{
}
~ContainerExport() = default;
int request_to_grpc(const lcrc_export_request *request, ExportRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_id(request->name);
}
if (request->file != nullptr) {
grequest->set_file(request->file);
}
return 0;
}
int response_from_grpc(ExportResponse *gresponse, lcrc_export_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const ExportRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
if (req.file().empty()) {
ERROR("Missing output file path in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const ExportRequest &req, ExportResponse *reply) override
{
return stub_->Export(context, req, reply);
}
};
class ContainerUpdate : public ClientBase<ContainerService, ContainerService::Stub, lcrc_update_request, UpdateRequest,
lcrc_update_response, UpdateResponse> {
public:
explicit ContainerUpdate(void *args)
: ClientBase(args)
{
}
~ContainerUpdate() = default;
int request_to_grpc(const lcrc_update_request *request, UpdateRequest *grequest) override
{
int ret = 0;
char *json = nullptr;
if (request == nullptr) {
return -1;
}
lcrc_host_config_t hostconfig;
(void)memset(&hostconfig, 0, sizeof(hostconfig));
2019-09-30 10:53:41 -04:00
if (request->updateconfig) {
hostconfig.restart_policy = request->updateconfig->restart_policy;
hostconfig.cr = request->updateconfig->cr;
}
ret = generate_hostconfig(&hostconfig, &json);
if (ret != 0) {
ERROR("Failed to generate hostconfig json");
ret = -1;
goto cleanup;
}
grequest->set_hostconfig(json);
if (request->name != nullptr) {
grequest->set_id(request->name);
}
cleanup:
free(json);
return ret;
}
int response_from_grpc(UpdateResponse *gresponse, lcrc_update_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const UpdateRequest &req) override
{
if (req.id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const UpdateRequest &req, UpdateResponse *reply) override
{
return stub_->Update(context, req, reply);
}
};
class ContainerConf : public ClientBase<ContainerService, ContainerService::Stub, lcrc_container_conf_request,
Container_conf_Request, lcrc_container_conf_response, Container_conf_Response> {
public:
explicit ContainerConf(void *args)
: ClientBase(args)
{
}
~ContainerConf() = default;
int request_to_grpc(const lcrc_container_conf_request *request, Container_conf_Request *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->name != nullptr) {
grequest->set_container_id(request->name);
}
return 0;
}
int response_from_grpc(Container_conf_Response *gresponse, lcrc_container_conf_response *response) override
{
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
if (!gresponse->container_logpath().empty()) {
response->container_logpath = util_strdup_s(gresponse->container_logpath().c_str());
}
response->container_logrotate = gresponse->container_logrotate();
if (!gresponse->container_logsize().empty()) {
response->container_logsize = util_strdup_s(gresponse->container_logsize().c_str());
}
return 0;
}
int check_parameter(const Container_conf_Request &req) override
{
if (req.container_id().empty()) {
ERROR("Missing container name in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const Container_conf_Request &req, Container_conf_Response *reply) override
{
return stub_->Container_conf(context, req, reply);
}
};
class ContainerStats : public ClientBase<ContainerService, ContainerService::Stub, lcrc_stats_request, StatsRequest,
lcrc_stats_response, StatsResponse> {
public:
explicit ContainerStats(void *args)
: ClientBase(args)
{
}
~ContainerStats() = default;
int request_to_grpc(const lcrc_stats_request *request, StatsRequest *grequest) override
{
if (request == nullptr) {
return -1;
}
if (request->runtime != nullptr) {
grequest->set_runtime(request->runtime);
}
for (size_t i = 0; request->containers != nullptr && i < request->containers_len; i++) {
grequest->add_containers(request->containers[i]);
}
grequest->set_all(request->all);
return 0;
}
int response_from_grpc(StatsResponse *gresponse, lcrc_stats_response *response) override
{
int size = gresponse->containers_size();
if (size > 0) {
response->container_stats =
static_cast<lcrc_container_info *>(util_common_calloc_s(size * sizeof(struct lcrc_container_info)));
if (response->container_stats == nullptr) {
ERROR("Out of memory");
return -1;
}
for (int i = 0; i < size; i++) {
if (!gresponse->containers(i).id().empty()) {
response->container_stats[i].id = util_strdup_s(gresponse->containers(i).id().c_str());
}
response->container_stats[i].has_pid = (int)gresponse->containers(i).pid() != -1;
response->container_stats[i].pid = (uint32_t)gresponse->containers(i).pid();
response->container_stats[i].status = (Container_Status)((int)gresponse->containers(i).status());
response->container_stats[i].pids_current = gresponse->containers(i).pids_current();
response->container_stats[i].cpu_use_nanos = gresponse->containers(i).cpu_use_nanos();
response->container_stats[i].cpu_system_use = gresponse->containers(i).cpu_system_use();
response->container_stats[i].online_cpus = gresponse->containers(i).online_cpus();
response->container_stats[i].blkio_read = gresponse->containers(i).blkio_read();
response->container_stats[i].blkio_write = gresponse->containers(i).blkio_write();
response->container_stats[i].mem_used = gresponse->containers(i).mem_used();
response->container_stats[i].mem_limit = gresponse->containers(i).mem_limit();
response->container_stats[i].kmem_used = gresponse->containers(i).kmem_used();
response->container_stats[i].kmem_limit = gresponse->containers(i).kmem_limit();
}
response->container_num = (size_t)size;
}
response->server_errono = gresponse->cc();
if (!gresponse->errmsg().empty()) {
response->errmsg = util_strdup_s(gresponse->errmsg().c_str());
}
return 0;
}
int check_parameter(const StatsRequest &req) override
{
if (req.runtime().empty()) {
ERROR("Missing runtime in the request");
return -1;
}
return 0;
}
Status grpc_call(ClientContext *context, const StatsRequest &req, StatsResponse *reply) override
{
return stub_->Stats(context, req, reply);
}
};
class ContainerEvents : public ClientBase<ContainerService, ContainerService::Stub, lcrc_events_request, EventsRequest,
lcrc_events_response, Event> {
public:
explicit ContainerEvents(void *args)
: ClientBase(args)
{
}
~ContainerEvents() = default;
int run(const struct lcrc_events_request *request, struct lcrc_events_response *response) override
{
int ret;
EventsRequest req;
Event event;
ClientContext context;
Status status;
container_events_format_t lcrc_event;
ret = events_request_to_grpc(request, &req);
if (ret != 0) {
ERROR("Failed to translate request to grpc");
response->server_errono = LCRD_ERR_INPUT;
return -1;
}
std::unique_ptr<ClientReader<Event>> reader(stub_->Events(&context, req));
while (reader->Read(&event)) {
event_from_grpc(&lcrc_event, &event);
if (request->cb != nullptr) {
request->cb(&lcrc_event);
}
}
status = reader->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
return -1;
}
if (response->server_errono != LCRD_SUCCESS) {
response->cc = LCRD_ERR_EXEC;
}
return (response->cc == LCRD_SUCCESS) ? 0 : -1;
}
private:
void protobuf_timestamp_to_grpc(const types_timestamp_t *timestamp, Timestamp *gtimestamp)
{
gtimestamp->set_seconds(timestamp->seconds);
gtimestamp->set_nanos(timestamp->nanos);
}
void protobuf_timestamp_from_grpc(types_timestamp_t *timestamp, const Timestamp &gtimestamp)
{
timestamp->has_seconds = gtimestamp.seconds() != 0;
timestamp->seconds = gtimestamp.seconds();
timestamp->has_nanos = gtimestamp.nanos() != 0;
timestamp->nanos = gtimestamp.nanos();
}
void event_from_grpc(container_events_format_t *event, Event *gevent)
{
(void)memset(event, 0, sizeof(*event));
2019-09-30 10:53:41 -04:00
if (!gevent->id().empty()) {
event->id = (char *)gevent->id().c_str();
}
event->has_type = true;
event->type = (container_events_type_t)((int)gevent->type());
event->has_pid = (int)gevent->pid() != -1;
event->pid = (uint32_t)gevent->pid();
event->has_exit_status = true;
event->exit_status = gevent->exit_status();
if (gevent->has_timestamp()) {
protobuf_timestamp_from_grpc(&event->timestamp, gevent->timestamp());
}
}
int events_request_to_grpc(const struct lcrc_events_request *request, EventsRequest *grequest)
{
if (request == nullptr) {
return -1;
}
grequest->set_storeonly(request->storeonly);
if (request->id != nullptr) {
grequest->set_id(request->id);
}
if (request->since.has_seconds || request->since.has_nanos) {
protobuf_timestamp_to_grpc((const types_timestamp_t *)(&request->since), grequest->mutable_since());
}
if (request->until.has_seconds || request->until.has_nanos) {
protobuf_timestamp_to_grpc((const types_timestamp_t *)(&request->until), grequest->mutable_until());
}
return 0;
}
};
struct CopyFromContainerContext {
CopyFromContainerRequest request;
ClientContext context;
ClientReader<CopyFromContainerResponse> *reader;
};
// Note: len of buf can not smaller than ARCHIVE_BLOCK_SIZE
static ssize_t CopyFromContainerRead(void *context, void *buf, size_t len)
{
CopyFromContainerResponse res;
struct CopyFromContainerContext *gcopy = (struct CopyFromContainerContext *)context;
if (!gcopy->reader->Read(&res)) {
return -1;
}
size_t data_len = res.data().length();
if (data_len <= len) {
(void)memcpy(buf, res.data().c_str(), data_len);
2019-09-30 10:53:41 -04:00
return (ssize_t)data_len;
}
return -1;
}
static int CopyFromContainerFinish(void *context, char **err)
{
struct CopyFromContainerContext *gcopy = (struct CopyFromContainerContext *)context;
CopyFromContainerResponse res;
if (gcopy->reader->Read(&res)) {
// Connection still alive, cancel it
gcopy->context.TryCancel();
gcopy->reader->Finish();
} else {
Status status = gcopy->reader->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
if (!status.error_message().empty() &&
(status.error_code() == StatusCode::UNKNOWN || status.error_code() == StatusCode::PERMISSION_DENIED ||
status.error_code() == grpc::StatusCode::INTERNAL)) {
*err = util_strdup_s(status.error_message().c_str());
} else {
*err = util_strdup_s(errno_to_error_message(LCRD_ERR_CONNECT));
}
return -1;
}
}
delete gcopy->reader;
delete gcopy;
return 0;
}
class CopyFromContainer
: public ClientBase<ContainerService, ContainerService::Stub, lcrc_copy_from_container_request,
CopyFromContainerRequest, lcrc_copy_from_container_response, CopyFromContainerResponse> {
public:
explicit CopyFromContainer(void *args)
: ClientBase(args)
{
}
~CopyFromContainer() = default;
int run(const struct lcrc_copy_from_container_request *request,
struct lcrc_copy_from_container_response *response) override
{
int ret;
CopyFromContainerResponse res;
struct CopyFromContainerContext *ctx = new (std::nothrow)(struct CopyFromContainerContext);
if (ctx == nullptr) {
return -1;
}
ret = copy_from_container_request_to_grpc(request, &ctx->request);
if (ret != 0) {
ERROR("Failed to translate request to grpc");
response->server_errono = LCRD_ERR_INPUT;
2019-12-25 15:50:34 +08:00
delete ctx;
2019-09-30 10:53:41 -04:00
return -1;
}
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
return -1;
}
ctx->context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
ctx->context.AddMetadata("tls_mode", m_tlsMode);
auto reader = stub_->CopyFromContainer(&ctx->context, ctx->request);
reader->WaitForInitialMetadata();
ctx->reader = reader.release();
auto metadata = ctx->context.GetServerInitialMetadata();
auto stat = metadata.find("isulad-container-path-stat");
if (stat != metadata.end()) {
char *err = nullptr;
std::string json = std::string(stat->second.data(), stat->second.length());
response->stat = container_path_stat_parse_data(json.c_str(), nullptr, &err);
if (response->stat == nullptr) {
ERROR("Invalid json: %s", err);
free(err);
CopyFromContainerFinish(ctx, &response->errmsg);
return -1;
}
free(err);
} else {
CopyFromContainerFinish(ctx, &response->errmsg);
return -1;
}
// Ignore the first reader which is used for transform metadata
ctx->reader->Read(&res);
response->reader.context = (void *)ctx;
response->reader.read = CopyFromContainerRead;
response->reader.close = CopyFromContainerFinish;
return 0;
}
private:
int copy_from_container_request_to_grpc(const struct lcrc_copy_from_container_request *request,
CopyFromContainerRequest *grequest)
{
if (request == nullptr) {
return -1;
}
if (request->runtime != nullptr) {
grequest->set_runtime(request->runtime);
}
if (request->id != nullptr) {
grequest->set_id(request->id);
}
if (request->srcpath != nullptr) {
grequest->set_srcpath(request->srcpath);
}
return 0;
}
};
class CopyToContainerWriteToServerTask : public StoppableThread {
public:
explicit CopyToContainerWriteToServerTask(
const struct io_read_wrapper *reader,
std::shared_ptr<ClientReaderWriter<CopyToContainerRequest, CopyToContainerResponse>> stream)
: m_reader(reader), m_stream(stream)
{
}
~CopyToContainerWriteToServerTask() = default;
void run()
{
size_t len = ARCHIVE_BLOCK_SIZE;
char *buf = (char *)util_common_calloc_s(len);
if (buf == nullptr) {
ERROR("Out of memory");
m_stream->WritesDone();
return;
}
while (stopRequested() == false) {
ssize_t have_read_len = m_reader->read(m_reader->context, buf, len);
CopyToContainerRequest request;
request.set_data((const void*)buf, (size_t)have_read_len);
if (!m_stream->Write(request)) {
DEBUG("Server may be exited, stop send data");
break;
}
}
free(buf);
m_stream->WritesDone();
}
private:
const struct io_read_wrapper *m_reader;
std::shared_ptr<ClientReaderWriter<CopyToContainerRequest, CopyToContainerResponse>> m_stream;
};
class CopyToContainer
: public ClientBase<ContainerService, ContainerService::Stub, lcrc_copy_to_container_request,
CopyToContainerRequest, lcrc_copy_to_container_response, CopyToContainerResponse> {
public:
explicit CopyToContainer(void *args)
: ClientBase(args)
{
}
~CopyToContainer() = default;
int set_custom_header_metadata(ClientContext &context, const struct lcrc_copy_to_container_request *request,
struct lcrc_copy_to_container_response *response)
{
int ret = 0;
char *json = nullptr;
char *err = nullptr;
container_copy_to_request copy = { 0 };
struct parser_context ctx = { OPT_GEN_SIMPLIFY, 0 };
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
if (request == nullptr || request->id == nullptr) {
ERROR("Missing container id in the request");
return -1;
}
copy.id = request->id;
copy.runtime = request->runtime;
copy.src_path = request->srcpath;
copy.src_isdir = request->srcisdir;
copy.src_rebase_name = request->srcrebase;
copy.dst_path = request->dstpath;
json = container_copy_to_request_generate_json(&copy, &ctx, &err);
if (json == nullptr) {
format_errorf(&response->errmsg, "Can not generate json: %s", err);
ret = -1;
goto out;
}
ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
ret = -1;
goto out;
}
context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
context.AddMetadata("tls_mode", m_tlsMode);
context.AddMetadata("isulad-copy-to-container", json);
out:
free(err);
free(json);
return ret;
}
int run(const struct lcrc_copy_to_container_request *request, struct lcrc_copy_to_container_response *response)
override
{
ClientContext context;
if (set_custom_header_metadata(context, request, response) != 0) {
ERROR("Failed to translate request to grpc");
response->cc = LCRD_ERR_INPUT;
return -1;
}
using StreamRSharedPtr = std::shared_ptr<ClientReaderWriter<CopyToContainerRequest, CopyToContainerResponse>>;
StreamRSharedPtr stream(stub_->CopyToContainer(&context));
CopyToContainerWriteToServerTask write_task(&request->reader, stream);
std::thread writer([&]() {
write_task.run();
});
CopyToContainerResponse stream_response;
while (stream->Read(&stream_response)) {
if (stream_response.finish()) {
break;
}
}
write_task.stop();
writer.join();
Status status = stream->Finish();
if (!status.ok()) {
ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
return -1;
}
return 0;
}
};
class ContainerLogs : public ClientBase<ContainerService, ContainerService::Stub, lcrc_logs_request, LogsRequest,
lcrc_logs_response, LogsResponse> {
public:
explicit ContainerLogs(void *args)
: ClientBase(args)
{
}
~ContainerLogs() = default;
int run(const struct lcrc_logs_request *request, struct lcrc_logs_response *response) override
{
ClientContext context;
LogsRequest grequest;
int ret = -1;
// Set common name from cert.perm
char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 };
ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value,
ClientBaseConstants::COMMON_NAME_LEN);
if (ret != 0) {
ERROR("Failed to get common name in: %s", m_certFile.c_str());
return -1;
}
context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value)));
context.AddMetadata("tls_mode", m_tlsMode);
if (logs_request_to_grpc(request, &grequest) != 0) {
ERROR("Failed to transform container request to grpc");
response->server_errono = LCRD_ERR_INPUT;
return -1;
}
auto reader = stub_->Logs(&context, grequest);
LogsResponse gresponse;
while (reader->Read(&gresponse)) {
show_container_log(request, gresponse);
}
Status status = reader->Finish();
if (!status.ok()) {
ERROR("error code: %d: %s", status.error_code(), status.error_message().c_str());
unpackStatus(status, response);
return -1;
}
return 0;
}
private:
void show_container_log(const struct lcrc_logs_request *request, const LogsResponse &gresponse)
{
static std::ostream *os = nullptr;
if (gresponse.stream() == "stdout") {
os = &std::cout;
} else if (gresponse.stream() == "stderr") {
os = &std::cerr;
} else {
ERROR("Invalid container log: %s", gresponse.stream().c_str());
return;
}
if (request->timestamps) {
(*os) << gresponse.time() << " ";
}
(*os) << gresponse.data();
}
int logs_request_to_grpc(const struct lcrc_logs_request *request, LogsRequest *grequest)
{
if (request == nullptr) {
return -1;
}
if (request->id != nullptr) {
grequest->set_id(request->id);
}
if (request->runtime != nullptr) {
grequest->set_runtime(request->runtime);
}
if (request->since != nullptr) {
grequest->set_since(request->since);
}
if (request->until != nullptr) {
grequest->set_until(request->until);
}
grequest->set_timestamps(request->timestamps);
grequest->set_follow(request->follow);
grequest->set_tail(request->tail);
grequest->set_details(request->details);
return 0;
}
};
int grpc_containers_client_ops_init(lcrc_connect_ops *ops)
{
if (ops == nullptr) {
return -1;
}
// implement following interface
ops->container.version = container_func<lcrc_version_request, lcrc_version_response, ContainerVersion>;
ops->container.info = container_func<lcrc_info_request, lcrc_info_response, ContainerInfo>;
ops->container.create = container_func<lcrc_create_request, lcrc_create_response, ContainerCreate>;
ops->container.start = container_func<lcrc_start_request, lcrc_start_response, ContainerStart>;
ops->container.remote_start = container_func<lcrc_start_request, lcrc_start_response, ContainerRemoteStart>;
ops->container.stop = container_func<lcrc_stop_request, lcrc_stop_response, ContainerStop>;
ops->container.restart = container_func<lcrc_restart_request, lcrc_restart_response, ContainerRestart>;
ops->container.remove = container_func<lcrc_delete_request, lcrc_delete_response, ContainerDelete>;
ops->container.list = container_func<lcrc_list_request, lcrc_list_response, ContainerList>;
ops->container.exec = container_func<lcrc_exec_request, lcrc_exec_response, ContainerExec>;
ops->container.remote_exec = container_func<lcrc_exec_request, lcrc_exec_response, ContainerRemoteExec>;
ops->container.attach = container_func<lcrc_attach_request, lcrc_attach_response, ContainerAttach>;
ops->container.pause = container_func<lcrc_pause_request, lcrc_pause_response, ContainerPause>;
ops->container.resume = container_func<lcrc_resume_request, lcrc_resume_response, ContainerResume>;
ops->container.update = container_func<lcrc_update_request, lcrc_update_response, ContainerUpdate>;
ops->container.conf = container_func<lcrc_container_conf_request, lcrc_container_conf_response, ContainerConf>;
ops->container.kill = container_func<lcrc_kill_request, lcrc_kill_response, ContainerKill>;
ops->container.stats = container_func<lcrc_stats_request, lcrc_stats_response, ContainerStats>;
ops->container.wait = container_func<lcrc_wait_request, lcrc_wait_response, ContainerWait>;
ops->container.events = container_func<lcrc_events_request, lcrc_events_response, ContainerEvents>;
ops->container.inspect = container_func<lcrc_inspect_request, lcrc_inspect_response, ContainerInspect>;
ops->container.export_rootfs = container_func<lcrc_export_request, lcrc_export_response, ContainerExport>;
ops->container.copy_from_container =
container_func<lcrc_copy_from_container_request, lcrc_copy_from_container_response, CopyFromContainer>;
ops->container.copy_to_container =
container_func<lcrc_copy_to_container_request, lcrc_copy_to_container_response, CopyToContainer>;
ops->container.top = container_func<lcrc_top_request, lcrc_top_response, ContainerTop>;
ops->container.rename = container_func<lcrc_rename_request, lcrc_rename_response, ContainerRename>;
ops->container.logs = container_func<lcrc_logs_request, lcrc_logs_response, ContainerLogs>;
return 0;
}