/****************************************************************************** * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. * iSulad licensed under the Mulan PSL v1. * You can use this software according to the terms and conditions of the Mulan PSL v1. * You may obtain a copy of Mulan PSL v1 at: * http://license.coscl.org.cn/MulanPSL * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v1 for more details. * Author: lifeng * Create: 2018-11-08 * Description: provide grpc containers client functions ******************************************************************************/ #include "grpc_containers_client.h" #include #include #include #include #include #include "container_copy_to_request.h" #include "container_exec_request.h" #include "utils.h" #include "libtar.h" #include "stoppable_thread.h" #include "container.grpc.pb.h" #include "client_base.h" #include "pack_config.h" using namespace containers; using grpc::Channel; using grpc::ClientContext; using grpc::ClientReader; using grpc::ClientReaderWriter; using grpc::ClientWriter; using grpc::Status; using grpc::StatusCode; using google::protobuf::Timestamp; class ContainerVersion : public ClientBase { public: explicit ContainerVersion(void *args) : ClientBase(args) { } ~ContainerVersion() = default; int response_from_grpc(VersionResponse *gresponse, isula_version_response *response) override { if (!gresponse->version().empty()) { response->version = util_strdup_s(gresponse->version().c_str()); } if (!gresponse->git_commit().empty()) { response->git_commit = util_strdup_s(gresponse->git_commit().c_str()); } if (!gresponse->build_time().empty()) { response->build_time = util_strdup_s(gresponse->build_time().c_str()); } if (!gresponse->root_path().empty()) { response->root_path = util_strdup_s(gresponse->root_path().c_str()); } if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } response->server_errono = gresponse->cc(); return 0; } Status grpc_call(ClientContext *context, const VersionRequest &req, VersionResponse *reply) override { return stub_->Version(context, req, reply); } }; class ContainerInfo : public ClientBase { public: explicit ContainerInfo(void *args) : ClientBase(args) { } ~ContainerInfo() = default; int response_from_grpc(InfoResponse *gresponse, isula_info_response *response) override { if (!gresponse->version().empty()) { response->version = util_strdup_s(gresponse->version().c_str()); } response->containers_num = gresponse->containers_num(); response->c_running = gresponse->c_running(); response->c_paused = gresponse->c_paused(); response->c_stopped = gresponse->c_stopped(); response->images_num = gresponse->images_num(); get_os_info_from_grpc(response, gresponse); if (!gresponse->logging_driver().empty()) { response->logging_driver = util_strdup_s(gresponse->logging_driver().c_str()); } if (!gresponse->huge_page_size().empty()) { response->huge_page_size = util_strdup_s(gresponse->huge_page_size().c_str()); } if (!gresponse->isulad_root_dir().empty()) { response->isulad_root_dir = util_strdup_s(gresponse->isulad_root_dir().c_str()); } response->total_mem = gresponse->total_mem(); get_proxy_info_from_grpc(response, gresponse); get_driver_info_from_grpc(response, gresponse); return 0; } Status grpc_call(ClientContext *context, const InfoRequest &req, InfoResponse *reply) override { return stub_->Info(context, req, reply); } private: void get_os_info_from_grpc(isula_info_response *response, InfoResponse *gresponse) { if (!gresponse->kversion().empty()) { response->kversion = util_strdup_s(gresponse->kversion().c_str()); } if (!gresponse->os_type().empty()) { response->os_type = util_strdup_s(gresponse->os_type().c_str()); } if (!gresponse->architecture().empty()) { response->architecture = util_strdup_s(gresponse->architecture().c_str()); } if (!gresponse->nodename().empty()) { response->nodename = util_strdup_s(gresponse->nodename().c_str()); } response->cpus = gresponse->cpus(); if (!gresponse->operating_system().empty()) { response->operating_system = util_strdup_s(gresponse->operating_system().c_str()); } if (!gresponse->cgroup_driver().empty()) { response->cgroup_driver = util_strdup_s(gresponse->cgroup_driver().c_str()); } } void get_proxy_info_from_grpc(isula_info_response *response, InfoResponse *gresponse) { if (!gresponse->http_proxy().empty()) { response->http_proxy = util_strdup_s(gresponse->http_proxy().c_str()); } if (!gresponse->https_proxy().empty()) { response->https_proxy = util_strdup_s(gresponse->https_proxy().c_str()); } if (!gresponse->no_proxy().empty()) { response->no_proxy = util_strdup_s(gresponse->no_proxy().c_str()); } } void get_driver_info_from_grpc(isula_info_response *response, InfoResponse *gresponse) { if (!gresponse->driver_name().empty()) { response->driver_name = util_strdup_s(gresponse->driver_name().c_str()); } if (!gresponse->driver_status().empty()) { response->driver_status = util_strdup_s(gresponse->driver_status().c_str()); } } }; class ContainerCreate : public ClientBase { public: explicit ContainerCreate(void *args) : ClientBase(args) { } ~ContainerCreate() = default; int request_to_grpc(const isula_create_request *request, CreateRequest *grequest) override { int ret = 0; char *host_json = nullptr, *config_json = nullptr; if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } if (request->rootfs != nullptr) { grequest->set_rootfs(request->rootfs); } if (request->image != nullptr) { grequest->set_image(request->image); } if (request->runtime != nullptr) { grequest->set_runtime(request->runtime); } ret = generate_hostconfig(request->hostconfig, &host_json); if (ret) { ERROR("Failed to pack host config"); return EINVALIDARGS; } grequest->set_hostconfig(host_json); free(host_json); ret = generate_container_config(request->config, &config_json); if (ret) { ERROR("Failed to pack custom config"); return EINVALIDARGS; } grequest->set_customconfig(config_json); free(config_json); return 0; } int response_from_grpc(CreateResponse *gresponse, isula_create_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } if (!gresponse->id().empty()) { response->id = util_strdup_s(gresponse->id().c_str()); } return 0; } int check_parameter(const CreateRequest &req) override { int nret = -1; if (req.runtime().empty()) { ERROR("Missing runtime in the request"); return nret; } if (req.rootfs().empty() && req.image().empty()) { ERROR("Missing container rootfs or image arguments in the request"); return nret; } if (req.hostconfig().empty()) { ERROR("Missing hostconfig in the request"); return nret; } if (req.customconfig().empty()) { ERROR("Missing customconfig in the request"); return nret; } return 0; } Status grpc_call(ClientContext *context, const CreateRequest &req, CreateResponse *reply) override { return stub_->Create(context, req, reply); } }; class ContainerStart : public ClientBase { public: explicit ContainerStart(void *args) : ClientBase(args) { } ~ContainerStart() = default; int request_to_grpc(const isula_start_request *request, StartRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } if (request->stdin != nullptr) { grequest->set_stdin(request->stdin); } if (request->stdout != nullptr) { grequest->set_stdout(request->stdout); } if (request->stderr != nullptr) { grequest->set_stderr(request->stderr); } grequest->set_attach_stdin(request->attach_stdin); grequest->set_attach_stdout(request->attach_stdout); grequest->set_attach_stderr(request->attach_stderr); return 0; } int response_from_grpc(StartResponse *gresponse, struct isula_start_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const StartRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const StartRequest &req, StartResponse *reply) override { return stub_->Start(context, req, reply); } }; class RemoteStartWriteToServerTask : public StoppableThread { public: explicit RemoteStartWriteToServerTask( std::shared_ptr> stream) : m_stream(stream) { } ~RemoteStartWriteToServerTask() = default; void run() { while (stopRequested() == false) { int cmd; cmd = getchar(); RemoteStartRequest request; if (cmd == EOF) { request.set_finish(true); } else { char in = (char)cmd; request.set_stdin(&in, 1); } if (!m_stream->Write(request)) { ERROR("Failed to write request to grpc server"); break; } if (cmd == EOF) { break; } } } private: std::shared_ptr> m_stream; }; class ContainerRemoteStart : public ClientBase { public: explicit ContainerRemoteStart(void *args) : ClientBase(args) { } ~ContainerRemoteStart() = default; int set_custom_header_metadata(ClientContext &context, const struct isula_start_request *request) { if (request == nullptr || request->name == nullptr) { ERROR("Missing container id in the request"); return -1; } // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; int ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret != 0) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); return -1; } context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); context.AddMetadata("tls_mode", m_tlsMode); context.AddMetadata("container-id", std::string(request->name)); context.AddMetadata("attach-stdin", request->attach_stdin ? "true" : "false"); context.AddMetadata("attach-stdout", request->attach_stdout ? "true" : "false"); context.AddMetadata("attach-stderr", request->attach_stderr ? "true" : "false"); return 0; } void get_server_trailing_metadata(ClientContext &context, isula_start_response *response) { auto metadata = context.GetServerTrailingMetadata(); auto cc = metadata.find("cc"); if (cc != metadata.end()) { auto tmpstr = std::string(cc->second.data(), cc->second.length()); response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0); } auto errmsg = metadata.find("errmsg"); if (errmsg != metadata.end()) { auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length()); response->errmsg = util_strdup_s(tmpstr.c_str()); } } int run(const struct isula_start_request *request, struct isula_start_response *response) override { ClientContext context; if (set_custom_header_metadata(context, request) != 0) { ERROR("Failed to translate request to grpc"); response->cc = ISULAD_ERR_INPUT; return -1; } using StreamStartRWSharedPtr = std::shared_ptr>; StreamStartRWSharedPtr stream(stub_->RemoteStart(&context)); RemoteStartWriteToServerTask write_task(stream); std::thread writer; if (request->attach_stdin) { writer = std::thread([&]() { write_task.run(); }); } RemoteStartResponse stream_response; if (request->attach_stdout || request->attach_stderr) { while (stream->Read(&stream_response)) { if (stream_response.finish()) { break; } if (!stream_response.stdout().empty()) { std::cout << stream_response.stdout() << std::flush; } if (!stream_response.stderr().empty()) { std::cerr << stream_response.stderr() << std::flush; } } } write_task.stop(); stream->WritesDone(); Status status = stream->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); goto out; } get_server_trailing_metadata(context, response); if (response->server_errono != ISULAD_SUCCESS) { response->cc = ISULAD_ERR_EXEC; goto out; } out: if (request->attach_stdin) { pthread_cancel(writer.native_handle()); if (writer.joinable()) { writer.join(); } } return (response->cc == ISULAD_SUCCESS) ? 0 : -1; } }; class ContainerTop : public ClientBase { public: explicit ContainerTop(void *args) : ClientBase(args) { } ~ContainerTop() = default; int request_to_grpc(const isula_top_request *request, TopRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } if (request->ps_argc > 0) { for (int i = 0; i < request->ps_argc; i++) { grequest->add_args(request->ps_args[i]); } } return 0; } int response_from_grpc(TopResponse *gresponse, struct isula_top_response *response) override { int i = 0; int num = gresponse->processes_size(); if (num <= 0) { response->titles = nullptr; response->processes_len = 0; response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } if (!gresponse->titles().empty()) { response->titles = util_strdup_s(gresponse->titles().c_str()); } if ((size_t)num > SIZE_MAX / sizeof(char *)) { ERROR("Too many summary info!"); return -1; } response->processes = (char **)util_common_calloc_s(num * sizeof(char *)); if (response->processes == nullptr) { ERROR("out of memory"); response->cc = ISULAD_ERR_MEMOUT; return -1; } for (i = 0; i < num; i++) { response->processes[i] = util_strdup_s(gresponse->processes(i).c_str()); } response->processes_len = (size_t)gresponse->processes_size(); return 0; } int check_parameter(const TopRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const TopRequest &req, TopResponse *reply) override { return stub_->Top(context, req, reply); } }; class ContainerStop : public ClientBase { public: explicit ContainerStop(void *args) : ClientBase(args) { } ~ContainerStop() = default; int request_to_grpc(const isula_stop_request *request, StopRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } grequest->set_force(request->force); grequest->set_timeout(request->timeout); return 0; } int response_from_grpc(StopResponse *gresponse, isula_stop_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const StopRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const StopRequest &req, StopResponse *reply) override { return stub_->Stop(context, req, reply); } }; class ContainerRename : public ClientBase { public: explicit ContainerRename(void *args) : ClientBase(args) { } ~ContainerRename() = default; int request_to_grpc(const isula_rename_request *request, RenameRequest *grequest) override { if (request == nullptr) { return -1; } if (request->old_name != nullptr) { grequest->set_oldname(request->old_name); } if (request->new_name != nullptr) { grequest->set_newname(request->new_name); } return 0; } int response_from_grpc(RenameResponse *gresponse, isula_rename_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const RenameRequest &req) override { if (req.oldname().empty()) { ERROR("Missing container old name in the request"); return -1; } if (req.newname().empty()) { ERROR("Missing container new name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const RenameRequest &req, RenameResponse *reply) override { return stub_->Rename(context, req, reply); } }; class ContainerResize : public ClientBase { public: explicit ContainerResize(void *args) : ClientBase(args) { } ~ContainerResize() = default; int request_to_grpc(const isula_resize_request *request, ResizeRequest *grequest) override { if (request == nullptr) { return -1; } if (request->id != nullptr) { grequest->set_id(request->id); } if (request->suffix != nullptr) { grequest->set_suffix(request->suffix); } grequest->set_height(request->height); grequest->set_width(request->width); return 0; } int response_from_grpc(ResizeResponse *gresponse, isula_resize_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const ResizeRequest &req) override { if (req.id().empty()) { ERROR("Missing container id in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const ResizeRequest &req, ResizeResponse *reply) override { return stub_->Resize(context, req, reply); } }; class ContainerRestart : public ClientBase { public: explicit ContainerRestart(void *args) : ClientBase(args) { } ~ContainerRestart() = default; int request_to_grpc(const isula_restart_request *request, RestartRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } grequest->set_timeout((int32_t)(request->timeout)); return 0; } int response_from_grpc(RestartResponse *gresponse, isula_restart_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const RestartRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const RestartRequest &req, RestartResponse *reply) override { return stub_->Restart(context, req, reply); } }; class ContainerKill : public ClientBase { public: explicit ContainerKill(void *args) : ClientBase(args) { } ~ContainerKill() = default; int request_to_grpc(const isula_kill_request *request, KillRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } grequest->set_signal(request->signal); return 0; } int response_from_grpc(KillResponse *gresponse, isula_kill_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const KillRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const KillRequest &req, KillResponse *reply) override { return stub_->Kill(context, req, reply); } }; class ContainerExec : public ClientBase { public: explicit ContainerExec(void *args) : ClientBase(args) { } ~ContainerExec() = default; int request_to_grpc(const isula_exec_request *request, ExecRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_container_id(request->name); } if (request->suffix != nullptr) { grequest->set_suffix(request->suffix); } grequest->set_tty(request->tty); grequest->set_open_stdin(request->open_stdin); grequest->set_attach_stdin(request->attach_stdin); grequest->set_attach_stdout(request->attach_stdout); grequest->set_attach_stderr(request->attach_stderr); if (request->stdin != nullptr) { grequest->set_stdin(request->stdin); } if (request->stdout != nullptr) { grequest->set_stdout(request->stdout); } if (request->stderr != nullptr) { grequest->set_stderr(request->stderr); } for (int i = 0; i < request->argc; i++) { grequest->add_argv(request->argv[i]); } for (size_t i = 0; i < request->env_len; i++) { grequest->add_env(request->env[i]); } if (request->user != nullptr) { grequest->set_user(request->user); } return 0; } int response_from_grpc(ExecResponse *gresponse, isula_exec_response *response) override { response->server_errono = gresponse->cc(); response->exit_code = gresponse->exit_code(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const ExecRequest &req) override { if (req.container_id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const ExecRequest &req, ExecResponse *reply) override { return stub_->Exec(context, req, reply); } }; class RemoteExecWriteToServerTask : public StoppableThread { public: explicit RemoteExecWriteToServerTask( std::shared_ptr> stream) : m_stream(stream) { } ~RemoteExecWriteToServerTask() = default; void run() { while (stopRequested() == false) { int cmd; cmd = getchar(); RemoteExecRequest request; if (cmd == EOF) { request.set_finish(true); } else { char in = (char)cmd; request.add_cmd(&in, 1); } if (!m_stream->Write(request)) { ERROR("Failed to write request to grpc server"); break; } if (cmd == EOF) { break; } } } private: std::shared_ptr> m_stream; }; class ContainerRemoteExec : public ClientBase { public: explicit ContainerRemoteExec(void *args) : ClientBase(args) { } ~ContainerRemoteExec() = default; int set_custom_header_metadata(ClientContext &context, const struct isula_exec_request *request, struct isula_exec_response *response) { int ret = 0; char *json = nullptr; parser_error err = nullptr; container_exec_request exec = { 0 }; struct parser_context ctx = { OPT_GEN_SIMPLIFY, 0 }; // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; if (request == nullptr || request->name == nullptr) { ERROR("Missing container id in the request"); return -1; } exec.container_id = request->name; exec.tty = request->tty; exec.attach_stdin = request->attach_stdin; exec.attach_stdout = request->attach_stdout; exec.attach_stderr = request->attach_stderr; exec.timeout = request->timeout; exec.argv = request->argv; exec.argv_len = (size_t)request->argc; exec.env = request->env; exec.env_len = request->env_len; exec.suffix = request->suffix; json = container_exec_request_generate_json(&exec, &ctx, &err); if (json == nullptr) { format_errorf(&response->errmsg, "Can not generate json: %s", err); ret = -1; goto out; } ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret != 0) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); ret = -1; goto out; } context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); context.AddMetadata("tls_mode", m_tlsMode); context.AddMetadata("isulad-remote-exec", json); out: free(err); free(json); return ret; } void get_server_trailing_metadata(ClientContext &context, isula_exec_response *response) { auto metadata = context.GetServerTrailingMetadata(); auto cc = metadata.find("cc"); if (cc != metadata.end()) { auto tmpstr = std::string(cc->second.data(), cc->second.length()); response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0); } auto exit_code = metadata.find("exit_code"); if (exit_code != metadata.end()) { auto tmpstr = std::string(exit_code->second.data(), exit_code->second.length()); response->exit_code = (uint32_t)std::stoul(tmpstr, nullptr, 0); } auto errmsg = metadata.find("errmsg"); if (errmsg != metadata.end()) { auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length()); response->errmsg = util_strdup_s(tmpstr.c_str()); } } int run(const struct isula_exec_request *request, struct isula_exec_response *response) override { ClientContext context; if (set_custom_header_metadata(context, request, response) != 0) { ERROR("Failed to translate request to grpc"); response->cc = ISULAD_ERR_INPUT; return -1; } std::shared_ptr> stream(stub_->RemoteExec(&context)); RemoteExecWriteToServerTask write_task(stream); std::thread writer([&]() { write_task.run(); }); RemoteExecResponse stream_response; while (stream->Read(&stream_response)) { if (stream_response.finish()) { break; } std::cout << stream_response.stdout() << std::flush; } write_task.stop(); stream->WritesDone(); Status status = stream->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); goto out; } get_server_trailing_metadata(context, response); if (response->server_errono != ISULAD_SUCCESS) { response->cc = ISULAD_ERR_EXEC; goto out; } out: pthread_cancel(writer.native_handle()); if (writer.joinable()) { writer.join(); } return (response->cc == ISULAD_SUCCESS) ? 0 : -1; } }; class ContainerInspect : public ClientBase { public: explicit ContainerInspect(void *args) : ClientBase(args) { } ~ContainerInspect() = default; int request_to_grpc(const isula_inspect_request *request, InspectContainerRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } grequest->set_bformat(request->bformat); grequest->set_timeout(request->timeout); return 0; } int response_from_grpc(InspectContainerResponse *gresponse, isula_inspect_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->containerjson().empty()) { response->json = util_strdup_s(gresponse->containerjson().c_str()); } if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const InspectContainerRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const InspectContainerRequest &req, InspectContainerResponse *reply) override { return stub_->Inspect(context, req, reply); } }; class ContainerDelete : public ClientBase { public: explicit ContainerDelete(void *args) : ClientBase(args) { } ~ContainerDelete() = default; int request_to_grpc(const isula_delete_request *request, DeleteRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } grequest->set_force(request->force); return 0; } int response_from_grpc(DeleteResponse *gresponse, isula_delete_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->id().empty()) { response->name = util_strdup_s(gresponse->id().c_str()); } if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const DeleteRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const DeleteRequest &req, DeleteResponse *reply) override { return stub_->Delete(context, req, reply); } }; class ContainerList : public ClientBase { public: explicit ContainerList(void *args) : ClientBase(args) { } ~ContainerList() = default; int request_to_grpc(const isula_list_request *request, ListRequest *grequest) override { if (request == nullptr) { return -1; } if (request->filters != nullptr) { google::protobuf::Map *map; map = grequest->mutable_filters(); for (size_t i = 0; i < request->filters->len; i++) { (*map)[request->filters->keys[i]] = request->filters->values[i]; } } grequest->set_all(request->all); return 0; } int response_from_grpc(ListResponse *gresponse, isula_list_response *response) override { int i = 0; int num = gresponse->containers_size(); if (num <= 0) { response->container_summary = nullptr; response->container_num = 0; response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } if ((size_t)num > SIZE_MAX / sizeof(isula_container_summary_info *)) { ERROR("Too many summary info!"); return -1; } response->container_summary = (struct isula_container_summary_info **)util_common_calloc_s( sizeof(struct isula_container_summary_info *) * (size_t)num); if (response->container_summary == nullptr) { ERROR("out of memory"); response->cc = ISULAD_ERR_MEMOUT; return -1; } for (i = 0; i < num; i++) { if (get_container_summary_from_grpc(response, gresponse, i)) { return -1; } } return 0; } Status grpc_call(ClientContext *context, const ListRequest &req, ListResponse *reply) override { return stub_->List(context, req, reply); } private: int get_container_summary_from_grpc(isula_list_response *response, ListResponse *gresponse, int index) { response->container_summary[index] = (struct isula_container_summary_info *)util_common_calloc_s(sizeof(struct isula_container_summary_info)); if (response->container_summary[index] == nullptr) { ERROR("out of memory"); response->cc = ISULAD_ERR_MEMOUT; return -1; } const Container &in = gresponse->containers(index); const char *id = !in.id().empty() ? in.id().c_str() : "-"; response->container_summary[index]->id = util_strdup_s(id); const char *name = !in.name().empty() ? in.name().c_str() : "-"; response->container_summary[index]->name = util_strdup_s(name); response->container_summary[index]->runtime = !in.runtime().empty() ? util_strdup_s(in.runtime().c_str()) : nullptr; response->container_summary[index]->has_pid = (int)in.pid() != 0; response->container_summary[index]->pid = (uint32_t)in.pid(); response->container_summary[index]->status = (Container_Status)in.status(); response->container_summary[index]->image = !in.image().empty() ? util_strdup_s(in.image().c_str()) : util_strdup_s("none"); response->container_summary[index]->command = !in.command().empty() ? util_strdup_s(in.command().c_str()) : util_strdup_s("-"); const char *starttime = !in.startat().empty() ? in.startat().c_str() : "-"; response->container_summary[index]->startat = util_strdup_s(starttime); const char *finishtime = !in.finishat().empty() ? in.finishat().c_str() : "-"; response->container_summary[index]->finishat = util_strdup_s(finishtime); response->container_summary[index]->exit_code = in.exit_code(); response->container_summary[index]->restart_count = (uint32_t)(in.restartcount()); response->container_summary[index]->created = (int64_t)in.created(); std::string healthState { "" }; if (!in.health_state().empty()) { healthState = "(" + in.health_state() + ")"; } response->container_summary[index]->health_state = !healthState.empty() ? util_strdup_s(healthState.c_str()) : nullptr; response->container_num++; return 0; } }; class ContainerWait : public ClientBase { public: explicit ContainerWait(void *args) : ClientBase(args) { } ~ContainerWait() = default; int request_to_grpc(const isula_wait_request *request, WaitRequest *grequest) override { if (request == nullptr) { return -1; } if (request->id != nullptr) { grequest->set_id(request->id); } grequest->set_condition(request->condition); return 0; } int response_from_grpc(WaitResponse *gresponse, isula_wait_response *response) override { response->exit_code = (int)gresponse->exit_code(); response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const WaitRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const WaitRequest &req, WaitResponse *reply) override { return stub_->Wait(context, req, reply); } }; class AttachWriteToServerTask : public StoppableThread { public: explicit AttachWriteToServerTask(std::shared_ptr> stream) : m_stream(stream) { } ~AttachWriteToServerTask() = default; void run() { while (stopRequested() == false) { int cmd; cmd = getchar(); AttachRequest request; if (cmd == EOF) { request.set_finish(true); } else { char in = (char)cmd; request.set_stdin(&in, 1); } if (!m_stream->Write(request)) { ERROR("Failed to write request to grpc server"); break; } if (cmd == EOF) { break; } } } private: std::shared_ptr> m_stream; }; class ContainerAttach : public ClientBase { public: explicit ContainerAttach(void *args) : ClientBase(args) { } ~ContainerAttach() = default; int set_custom_header_metadata(ClientContext &context, const struct isula_attach_request *request) { if (request == nullptr || request->name == nullptr) { ERROR("Missing container id in the request"); return -1; } // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; int ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret != 0) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); return -1; } context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); context.AddMetadata("tls_mode", m_tlsMode); context.AddMetadata("container-id", std::string(request->name)); context.AddMetadata("attach-stdin", request->attach_stdin ? "true" : "false"); context.AddMetadata("attach-stdout", request->attach_stdout ? "true" : "false"); context.AddMetadata("attach-stderr", request->attach_stderr ? "true" : "false"); return 0; } void get_server_trailing_metadata(ClientContext &context, isula_attach_response *response) { auto metadata = context.GetServerTrailingMetadata(); auto cc = metadata.find("cc"); if (cc != metadata.end()) { auto tmpstr = std::string(cc->second.data(), cc->second.length()); response->server_errono = (uint32_t)std::stoul(tmpstr, nullptr, 0); } auto errmsg = metadata.find("errmsg"); if (errmsg != metadata.end()) { auto tmpstr = std::string(errmsg->second.data(), errmsg->second.length()); response->errmsg = util_strdup_s(tmpstr.c_str()); } } int run(const struct isula_attach_request *request, struct isula_attach_response *response) override { ClientContext context; if (set_custom_header_metadata(context, request) != 0) { ERROR("Failed to translate request to grpc"); response->cc = ISULAD_ERR_INPUT; return -1; } std::shared_ptr> stream(stub_->Attach(&context)); AttachWriteToServerTask write_task(stream); std::thread writer([&]() { write_task.run(); }); if (request->attach_stdin) { AttachResponse stream_response; while (stream->Read(&stream_response)) { if (stream_response.finish()) { break; } if (!stream_response.stdout().empty()) { std::cout << stream_response.stdout() << std::flush; } if (!stream_response.stderr().empty()) { std::cerr << stream_response.stderr() << std::flush; } } } write_task.stop(); stream->WritesDone(); Status status = stream->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); goto out; } get_server_trailing_metadata(context, response); if (response->server_errono != ISULAD_SUCCESS) { response->cc = ISULAD_ERR_EXEC; } out: if (request->attach_stdin) { pthread_cancel(writer.native_handle()); if (writer.joinable()) { writer.join(); } } return (response->cc == ISULAD_SUCCESS) ? 0 : -1; } }; class ContainerPause : public ClientBase { public: explicit ContainerPause(void *args) : ClientBase(args) { } ~ContainerPause() = default; int request_to_grpc(const isula_pause_request *request, PauseRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } return 0; } int response_from_grpc(PauseResponse *gresponse, isula_pause_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const PauseRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const PauseRequest &req, PauseResponse *reply) override { return stub_->Pause(context, req, reply); } }; class ContainerResume : public ClientBase { public: explicit ContainerResume(void *args) : ClientBase(args) { } ~ContainerResume() = default; int request_to_grpc(const isula_resume_request *request, ResumeRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } return 0; } int response_from_grpc(ResumeResponse *gresponse, isula_resume_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const ResumeRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const ResumeRequest &req, ResumeResponse *reply) override { return stub_->Resume(context, req, reply); } }; class ContainerExport : public ClientBase { public: explicit ContainerExport(void *args) : ClientBase(args) { } ~ContainerExport() = default; int request_to_grpc(const isula_export_request *request, ExportRequest *grequest) override { if (request == nullptr) { return -1; } if (request->name != nullptr) { grequest->set_id(request->name); } if (request->file != nullptr) { grequest->set_file(request->file); } return 0; } int response_from_grpc(ExportResponse *gresponse, isula_export_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const ExportRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } if (req.file().empty()) { ERROR("Missing output file path in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const ExportRequest &req, ExportResponse *reply) override { return stub_->Export(context, req, reply); } }; class ContainerUpdate : public ClientBase { public: explicit ContainerUpdate(void *args) : ClientBase(args) { } ~ContainerUpdate() = default; int request_to_grpc(const isula_update_request *request, UpdateRequest *grequest) override { int ret = 0; char *json = nullptr; if (request == nullptr) { return -1; } isula_host_config_t hostconfig; (void)memset(&hostconfig, 0, sizeof(hostconfig)); if (request->updateconfig) { hostconfig.restart_policy = request->updateconfig->restart_policy; hostconfig.cr = request->updateconfig->cr; } ret = generate_hostconfig(&hostconfig, &json); if (ret != 0) { ERROR("Failed to generate hostconfig json"); ret = -1; goto cleanup; } grequest->set_hostconfig(json); if (request->name != nullptr) { grequest->set_id(request->name); } cleanup: free(json); return ret; } int response_from_grpc(UpdateResponse *gresponse, isula_update_response *response) override { response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const UpdateRequest &req) override { if (req.id().empty()) { ERROR("Missing container name in the request"); return -1; } return 0; } Status grpc_call(ClientContext *context, const UpdateRequest &req, UpdateResponse *reply) override { return stub_->Update(context, req, reply); } }; class ContainerStats : public ClientBase { public: explicit ContainerStats(void *args) : ClientBase(args) { } ~ContainerStats() = default; int request_to_grpc(const isula_stats_request *request, StatsRequest *grequest) override { if (request == nullptr) { return -1; } for (size_t i = 0; request->containers != nullptr && i < request->containers_len; i++) { grequest->add_containers(request->containers[i]); } grequest->set_all(request->all); return 0; } int response_from_grpc(StatsResponse *gresponse, isula_stats_response *response) override { int size = gresponse->containers_size(); if (size > 0) { response->container_stats = static_cast(util_common_calloc_s(size * sizeof(struct isula_container_info))); if (response->container_stats == nullptr) { ERROR("Out of memory"); return -1; } for (int i = 0; i < size; i++) { if (!gresponse->containers(i).id().empty()) { response->container_stats[i].id = util_strdup_s(gresponse->containers(i).id().c_str()); } response->container_stats[i].pids_current = gresponse->containers(i).pids_current(); response->container_stats[i].cpu_use_nanos = gresponse->containers(i).cpu_use_nanos(); response->container_stats[i].cpu_system_use = gresponse->containers(i).cpu_system_use(); response->container_stats[i].online_cpus = gresponse->containers(i).online_cpus(); response->container_stats[i].blkio_read = gresponse->containers(i).blkio_read(); response->container_stats[i].blkio_write = gresponse->containers(i).blkio_write(); response->container_stats[i].mem_used = gresponse->containers(i).mem_used(); response->container_stats[i].mem_limit = gresponse->containers(i).mem_limit(); response->container_stats[i].kmem_used = gresponse->containers(i).kmem_used(); response->container_stats[i].kmem_limit = gresponse->containers(i).kmem_limit(); } response->container_num = (size_t)size; } response->server_errono = gresponse->cc(); if (!gresponse->errmsg().empty()) { response->errmsg = util_strdup_s(gresponse->errmsg().c_str()); } return 0; } int check_parameter(const StatsRequest &req) override { return 0; } Status grpc_call(ClientContext *context, const StatsRequest &req, StatsResponse *reply) override { return stub_->Stats(context, req, reply); } }; class ContainerEvents : public ClientBase { public: explicit ContainerEvents(void *args) : ClientBase(args) { } ~ContainerEvents() = default; int run(const struct isula_events_request *request, struct isula_events_response *response) override { int ret; EventsRequest req; Event event; ClientContext context; Status status; container_events_format_t isula_event; ret = events_request_to_grpc(request, &req); if (ret != 0) { ERROR("Failed to translate request to grpc"); response->server_errono = ISULAD_ERR_INPUT; return -1; } std::unique_ptr> reader(stub_->Events(&context, req)); while (reader->Read(&event)) { event_from_grpc(&isula_event, &event); if (request->cb != nullptr) { request->cb(&isula_event); } } status = reader->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); return -1; } if (response->server_errono != ISULAD_SUCCESS) { response->cc = ISULAD_ERR_EXEC; } return (response->cc == ISULAD_SUCCESS) ? 0 : -1; } private: void protobuf_timestamp_to_grpc(const types_timestamp_t *timestamp, Timestamp *gtimestamp) { gtimestamp->set_seconds(timestamp->seconds); gtimestamp->set_nanos(timestamp->nanos); } void protobuf_timestamp_from_grpc(types_timestamp_t *timestamp, const Timestamp >imestamp) { timestamp->has_seconds = gtimestamp.seconds() != 0; timestamp->seconds = gtimestamp.seconds(); timestamp->has_nanos = gtimestamp.nanos() != 0; timestamp->nanos = gtimestamp.nanos(); } void event_from_grpc(container_events_format_t *event, Event *gevent) { (void)memset(event, 0, sizeof(*event)); if (!gevent->id().empty()) { event->id = (char *)gevent->id().c_str(); } event->has_type = true; event->type = (container_events_type_t)((int)gevent->type()); event->has_pid = (int)gevent->pid() != -1; event->pid = (uint32_t)gevent->pid(); event->has_exit_status = true; event->exit_status = gevent->exit_status(); if (gevent->has_timestamp()) { protobuf_timestamp_from_grpc(&event->timestamp, gevent->timestamp()); } } int events_request_to_grpc(const struct isula_events_request *request, EventsRequest *grequest) { if (request == nullptr) { return -1; } grequest->set_storeonly(request->storeonly); if (request->id != nullptr) { grequest->set_id(request->id); } if (request->since.has_seconds || request->since.has_nanos) { protobuf_timestamp_to_grpc((const types_timestamp_t *)(&request->since), grequest->mutable_since()); } if (request->until.has_seconds || request->until.has_nanos) { protobuf_timestamp_to_grpc((const types_timestamp_t *)(&request->until), grequest->mutable_until()); } return 0; } }; struct CopyFromContainerContext { CopyFromContainerRequest request; ClientContext context; ClientReader *reader; }; // Note: len of buf can not smaller than ARCHIVE_BLOCK_SIZE static ssize_t CopyFromContainerRead(void *context, void *buf, size_t len) { CopyFromContainerResponse res; struct CopyFromContainerContext *gcopy = (struct CopyFromContainerContext *)context; if (!gcopy->reader->Read(&res)) { return -1; } size_t data_len = res.data().length(); if (data_len <= len) { (void)memcpy(buf, res.data().c_str(), data_len); return (ssize_t)data_len; } return -1; } static int CopyFromContainerFinish(void *context, char **err) { struct CopyFromContainerContext *gcopy = (struct CopyFromContainerContext *)context; CopyFromContainerResponse res; if (gcopy->reader->Read(&res)) { // Connection still alive, cancel it gcopy->context.TryCancel(); gcopy->reader->Finish(); } else { Status status = gcopy->reader->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); if (!status.error_message().empty() && (status.error_code() == StatusCode::UNKNOWN || status.error_code() == StatusCode::PERMISSION_DENIED || status.error_code() == grpc::StatusCode::INTERNAL)) { *err = util_strdup_s(status.error_message().c_str()); } else { *err = util_strdup_s(errno_to_error_message(ISULAD_ERR_CONNECT)); } return -1; } } delete gcopy->reader; delete gcopy; return 0; } class CopyFromContainer : public ClientBase { public: explicit CopyFromContainer(void *args) : ClientBase(args) { } ~CopyFromContainer() = default; int run(const struct isula_copy_from_container_request *request, struct isula_copy_from_container_response *response) override { int ret; CopyFromContainerResponse res; struct CopyFromContainerContext *ctx = new (std::nothrow)(struct CopyFromContainerContext); if (ctx == nullptr) { return -1; } ret = copy_from_container_request_to_grpc(request, &ctx->request); if (ret != 0) { ERROR("Failed to translate request to grpc"); response->server_errono = ISULAD_ERR_INPUT; delete ctx; return -1; } // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); return -1; } ctx->context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); ctx->context.AddMetadata("tls_mode", m_tlsMode); auto reader = stub_->CopyFromContainer(&ctx->context, ctx->request); reader->WaitForInitialMetadata(); ctx->reader = reader.release(); auto metadata = ctx->context.GetServerInitialMetadata(); auto stat = metadata.find("isulad-container-path-stat"); if (stat != metadata.end()) { char *err = nullptr; std::string json = std::string(stat->second.data(), stat->second.length()); response->stat = container_path_stat_parse_data(json.c_str(), nullptr, &err); if (response->stat == nullptr) { ERROR("Invalid json: %s", err); free(err); CopyFromContainerFinish(ctx, &response->errmsg); return -1; } free(err); } else { CopyFromContainerFinish(ctx, &response->errmsg); return -1; } // Ignore the first reader which is used for transform metadata ctx->reader->Read(&res); response->reader.context = (void *)ctx; response->reader.read = CopyFromContainerRead; response->reader.close = CopyFromContainerFinish; return 0; } private: int copy_from_container_request_to_grpc(const struct isula_copy_from_container_request *request, CopyFromContainerRequest *grequest) { if (request == nullptr) { return -1; } if (request->runtime != nullptr) { grequest->set_runtime(request->runtime); } if (request->id != nullptr) { grequest->set_id(request->id); } if (request->srcpath != nullptr) { grequest->set_srcpath(request->srcpath); } return 0; } }; class CopyToContainerWriteToServerTask : public StoppableThread { public: explicit CopyToContainerWriteToServerTask( const struct io_read_wrapper *reader, std::shared_ptr> stream) : m_reader(reader), m_stream(stream) { } ~CopyToContainerWriteToServerTask() = default; void run() { size_t len = ARCHIVE_BLOCK_SIZE; char *buf = (char *)util_common_calloc_s(len); if (buf == nullptr) { ERROR("Out of memory"); m_stream->WritesDone(); return; } while (stopRequested() == false) { ssize_t have_read_len = m_reader->read(m_reader->context, buf, len); CopyToContainerRequest request; request.set_data((const void*)buf, (size_t)have_read_len); if (!m_stream->Write(request)) { DEBUG("Server may be exited, stop send data"); break; } } free(buf); m_stream->WritesDone(); } private: const struct io_read_wrapper *m_reader; std::shared_ptr> m_stream; }; class CopyToContainer : public ClientBase { public: explicit CopyToContainer(void *args) : ClientBase(args) { } ~CopyToContainer() = default; int set_custom_header_metadata(ClientContext &context, const struct isula_copy_to_container_request *request, struct isula_copy_to_container_response *response) { int ret = 0; char *json = nullptr; char *err = nullptr; container_copy_to_request copy = { 0 }; struct parser_context ctx = { OPT_GEN_SIMPLIFY, 0 }; // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; if (request == nullptr || request->id == nullptr) { ERROR("Missing container id in the request"); return -1; } copy.id = request->id; copy.runtime = request->runtime; copy.src_path = request->srcpath; copy.src_isdir = request->srcisdir; copy.src_rebase_name = request->srcrebase; copy.dst_path = request->dstpath; json = container_copy_to_request_generate_json(©, &ctx, &err); if (json == nullptr) { format_errorf(&response->errmsg, "Can not generate json: %s", err); ret = -1; goto out; } ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); ret = -1; goto out; } context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); context.AddMetadata("tls_mode", m_tlsMode); context.AddMetadata("isulad-copy-to-container", json); out: free(err); free(json); return ret; } int run(const struct isula_copy_to_container_request *request, struct isula_copy_to_container_response *response) override { ClientContext context; if (set_custom_header_metadata(context, request, response) != 0) { ERROR("Failed to translate request to grpc"); response->cc = ISULAD_ERR_INPUT; return -1; } using StreamRSharedPtr = std::shared_ptr>; StreamRSharedPtr stream(stub_->CopyToContainer(&context)); CopyToContainerWriteToServerTask write_task(&request->reader, stream); std::thread writer([&]() { write_task.run(); }); CopyToContainerResponse stream_response; while (stream->Read(&stream_response)) { if (stream_response.finish()) { break; } } write_task.stop(); writer.join(); Status status = stream->Finish(); if (!status.ok()) { ERROR("error_code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); return -1; } return 0; } }; class ContainerLogs : public ClientBase { public: explicit ContainerLogs(void *args) : ClientBase(args) { } ~ContainerLogs() = default; int run(const struct isula_logs_request *request, struct isula_logs_response *response) override { ClientContext context; LogsRequest grequest; int ret = -1; // Set common name from cert.perm char common_name_value[ClientBaseConstants::COMMON_NAME_LEN] = { 0 }; ret = get_common_name_from_tls_cert(m_certFile.c_str(), common_name_value, ClientBaseConstants::COMMON_NAME_LEN); if (ret != 0) { ERROR("Failed to get common name in: %s", m_certFile.c_str()); return -1; } context.AddMetadata("username", std::string(common_name_value, strlen(common_name_value))); context.AddMetadata("tls_mode", m_tlsMode); if (logs_request_to_grpc(request, &grequest) != 0) { ERROR("Failed to transform container request to grpc"); response->server_errono = ISULAD_ERR_INPUT; return -1; } auto reader = stub_->Logs(&context, grequest); LogsResponse gresponse; while (reader->Read(&gresponse)) { show_container_log(request, gresponse); } Status status = reader->Finish(); if (!status.ok()) { ERROR("error code: %d: %s", status.error_code(), status.error_message().c_str()); unpackStatus(status, response); return -1; } return 0; } private: void show_container_log(const struct isula_logs_request *request, const LogsResponse &gresponse) { static std::ostream *os = nullptr; if (gresponse.stream() == "stdout") { os = &std::cout; } else if (gresponse.stream() == "stderr") { os = &std::cerr; } else { ERROR("Invalid container log: %s", gresponse.stream().c_str()); return; } if (request->timestamps) { (*os) << gresponse.time() << " "; } (*os) << gresponse.data(); } int logs_request_to_grpc(const struct isula_logs_request *request, LogsRequest *grequest) { if (request == nullptr) { return -1; } if (request->id != nullptr) { grequest->set_id(request->id); } if (request->runtime != nullptr) { grequest->set_runtime(request->runtime); } if (request->since != nullptr) { grequest->set_since(request->since); } if (request->until != nullptr) { grequest->set_until(request->until); } grequest->set_timestamps(request->timestamps); grequest->set_follow(request->follow); grequest->set_tail(request->tail); grequest->set_details(request->details); return 0; } }; int grpc_containers_client_ops_init(isula_connect_ops *ops) { if (ops == nullptr) { return -1; } // implement following interface ops->container.version = container_func; ops->container.info = container_func; ops->container.create = container_func; ops->container.start = container_func; ops->container.remote_start = container_func; ops->container.stop = container_func; ops->container.restart = container_func; ops->container.remove = container_func; ops->container.list = container_func; ops->container.exec = container_func; ops->container.remote_exec = container_func; ops->container.attach = container_func; ops->container.pause = container_func; ops->container.resume = container_func; ops->container.update = container_func; ops->container.kill = container_func; ops->container.stats = container_func; ops->container.wait = container_func; ops->container.events = container_func; ops->container.inspect = container_func; ops->container.export_rootfs = container_func; ops->container.copy_from_container = container_func; ops->container.copy_to_container = container_func; ops->container.top = container_func; ops->container.rename = container_func; ops->container.resize = container_func; ops->container.logs = container_func; return 0; }