apk build
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
9.4 KiB

#!/usr/bin/env ruby
# -------------------------------------------------------------------------- #
# Copyright 2002-2023, OpenNebula Project, OpenNebula Systems #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
# frozen_string_literal: true
# rubocop:disable Lint/MissingCopEnableDirective
# rubocop:disable Lint/RedundantRequireStatement
# rubocop:disable Layout/FirstHashElementIndentation
# rubocop:disable Layout/HashAlignment
# rubocop:disable Layout/HeredocIndentation
# rubocop:disable Layout/IndentationWidth
# rubocop:disable Style/HashSyntax
# rubocop:disable Style/ParallelAssignment
ONE_LOCATION = ENV['ONE_LOCATION']
if !ONE_LOCATION
RUBY_LIB_LOCATION = '/usr/lib/one/ruby'
GEMS_LOCATION = '/usr/share/one/gems'
ETC_LOCATION = '/etc/one'
REMOTES_LOCATION = '/var/tmp/one'
else
RUBY_LIB_LOCATION = ONE_LOCATION + '/lib/ruby'
GEMS_LOCATION = ONE_LOCATION + '/share/gems'
ETC_LOCATION = ONE_LOCATION + '/etc'
REMOTES_LOCATION = ONE_LOCATION + '/var/remotes'
end
CONFIGURATION_FILE = REMOTES_LOCATION + '/etc/onegate-proxy.conf'
# %%RUBYGEMS_SETUP_BEGIN%%
if File.directory?(GEMS_LOCATION)
real_gems_path = File.realpath(GEMS_LOCATION)
if !defined?(Gem) || Gem.path != [real_gems_path]
$LOAD_PATH.reject! {|p| p =~ /vendor_ruby/ }
# Suppress warnings from Rubygems
# https://github.com/OpenNebula/one/issues/5379
begin
verb = $VERBOSE
$VERBOSE = nil
require 'rubygems'
Gem.use_paths(real_gems_path)
ensure
$VERBOSE = verb
end
end
end
# %%RUBYGEMS_SETUP_END%%
$LOAD_PATH << RUBY_LIB_LOCATION
require 'async/io'
require 'async/io/stream'
require 'async/io/trap'
require 'etc'
require 'pp'
require 'rb-inotify'
require 'socket'
require 'yaml'
$stdout.sync = true
$stderr.sync = true
DEFAULT_OPTIONS = {
:debug_level => 2, # 0 = ERROR, 1 = WARNING, 2 = INFO, 3 = DEBUG
:process_owner => 'oneadmin',
:onegate_addr => '127.0.0.1',
:onegate_port => '5030',
:service_addr => '169.254.16.9'
}.freeze
# Proxy-class for converting log levels between OpenNebula and
# the socketry/console library. It also splits specific log levels
# into separate stdout and stderr loggers.
class Logger
LOG_LEVEL_MAP = {
0 => '3', # ERROR
1 => '2', # WARN
2 => '1', # INFO
3 => '0' # DEBUG
}.freeze
def initialize(log_level = 2)
@out = Console::Logger.default_logger $stdout, {
'CONSOLE_LEVEL' => LOG_LEVEL_MAP[log_level]
}
@err = Console::Logger.default_logger $stderr, {
'CONSOLE_LEVEL' => LOG_LEVEL_MAP[log_level]
}
end
# rubocop:disable Style/ArgumentsForwarding
def error(*args, &block)
@err.error(*args, &block)
end
def warn(*args, &block)
@err.warn(*args, &block)
end
def info(*args, &block)
@out.info(*args, &block)
end
def debug(*args, &block)
@err.debug(*args, &block)
end
# rubocop:enable Style/ArgumentsForwarding
end
# Class that implements a classic two-way TCP socket proxy (async).
class OneGateProxy
def initialize(options = {})
@options = DEFAULT_OPTIONS.dup.merge! options
@options.each {|k, v| instance_variable_set("@#{k}", v) }
@logger = Logger.new options[:debug_level]
@sigint = Async::IO::Trap.new :INT
@sigint.install!
@inotify = setup_inotify
@inotify_io = Async::IO::Generic.new @inotify.to_io
@proxy_ep = Async::IO::Endpoint.socket setup_socket
end
def run
# NOTE: At this point all config should be set in stone,
# we can drop root privileges..
drop_privileges
Async do |task|
# Make CTRL-C work..
task.async do
@sigint.wait { exit 0 }
end
# Handle filesystem notifications..
task.async do
@inotify.process while @inotify_io.wait_readable
end
glue_peers task
end
end
private
def drop_privileges
new_gid, new_uid = Etc.getpwnam(@process_owner).gid,
Etc.getpwnam(@process_owner).uid
@logger.info(self) do
"Drop root privileges -> #{@process_owner}"
end
Process::Sys.setgid new_gid
Process::Sys.setuid new_uid
end
def setup_inotify
inotify = INotify::Notifier.new
inotify.watch(CONFIGURATION_FILE, :modify) do
@logger.info(self) do
"#{CONFIGURATION_FILE} has been just updated, exiting.."
end
# We assume here that the service will be restarted by
# the service manager.
exit 0
end
inotify
rescue Errno::ENOENT => e
@logger.error(self) do
e.message
end
# We assume here that the service will be restarted by
# the service manager.
exit e.class::Errno
end
def setup_service_addr
# NOTE: We need the service_addr to be defined on one of the interfaces
# inside the host, one natural choice is the loopback interface (lo).
# Effectively we set it once, subsequent restarts of the service should
# honor the idempotence.
ip_address_add_cmd = lambda do |cidr_host, nic_device|
check = "[ -n \"$(ip a s to '#{cidr_host}' dev '#{nic_device}')\" ]"
apply = "ip a a '#{cidr_host}' dev '#{nic_device}'"
"#{check.strip} >/dev/null 2>&1 || #{apply.strip}"
end
system ip_address_add_cmd.call "#{@service_addr}/32", 'lo'
end
def setup_socket(listen = Socket::SOMAXCONN)
# NOTE: Must be executed before calling bind(), otherwise it fails..
setup_service_addr
sock = Socket.new Socket::AF_INET, Socket::SOCK_STREAM, 0
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1
@logger.info(self) do
"Bind #{Addrinfo.tcp(@service_addr, @onegate_port).inspect}"
end
sock.bind Socket.pack_sockaddr_in(@onegate_port, @service_addr)
sock.listen listen
sock
end
def glue_streams(stream1, stream2, task)
Async do
concurrent = []
concurrent << task.async do
while (chunk = stream1.read_partial)
stream2.write chunk
stream2.flush
end
end
concurrent << task.async do
while (chunk = stream2.read_partial)
stream1.write chunk
stream1.flush
end
end
concurrent.each(&:wait)
end
end
def glue_peers(task)
@proxy_ep.accept do |vm_peer|
@logger.debug(self) do
"Accept #{vm_peer.remote_address.inspect}"
end
begin
gate_ep = Async::IO::Endpoint.tcp @onegate_addr,
@onegate_port
gate_ep.connect do |gate_peer|
vm_stream, gate_stream = Async::IO::Stream.new(vm_peer),
Async::IO::Stream.new(gate_peer)
glue_streams(vm_stream, gate_stream, task).wait
@logger.debug(self) do
"Close #{gate_peer.remote_address.inspect}"
end
gate_peer.close
end
rescue Errno::ECONNREFUSED,
Errno::ECONNRESET,
Errno::EHOSTUNREACH,
Errno::ETIMEDOUT => e
@logger.error(self) do
e.message
end
end
@logger.debug(self) do
"Close #{vm_peer.remote_address.inspect}"
end
vm_peer.close
end
end
end
if caller.empty?
options = DEFAULT_OPTIONS.dup
# NOTE: The "CONFIGURATION_FILE" is updated during the host sync procedure.
begin
options.merge! YAML.load_file(CONFIGURATION_FILE)
rescue StandardError => e
warn "Error parsing config file #{CONFIGURATION_FILE}: #{e.message}"
exit 1
end
puts <<~HEADER
--------------------------------------
Proxy configuration
--------------------------------------
#{options.pretty_inspect.strip}
--------------------------------------
HEADER
service = OneGateProxy.new options
service.run
end