Skip to content

Commit 3107077

Browse files
authored
[CWM-180] Fix last action updates. (#4)
* [CWM-180] Fix last action updates. Signed-off-by: Azeem Sajid <azeem.sajid@gmail.com> * Add queue and timer for last action updates.
1 parent 997bdb8 commit 3107077

File tree

2 files changed

+71
-34
lines changed

2 files changed

+71
-34
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@
1010
![GitHub code size in bytes](https://img.shields.io/github/languages/code-size/iamAzeem/fluent-plugin-http-cwm?style=flat-square)
1111
![GitHub repo size](https://img.shields.io/github/repo-size/iamAzeem/fluent-plugin-http-cwm?style=flat-square)
1212

13+
- [Overview](#overview)
14+
- [Installation](#installation)
15+
- [RubyGems](#rubygems)
16+
- [Bundler](#bundler)
17+
- [Configuration](#configuration)
18+
- [`<redis>` section (optional) (single)](#redis-section-optional-single)
19+
- [Sample Configuration](#sample-configuration)
20+
- [Contribute](#contribute)
21+
- [Publish the gem](#publish-the-gem)
22+
- [License](#license)
23+
1324
## Overview
1425

1526
[Fluentd](https://fluentd.org/) HTTP input plugin for
@@ -100,7 +111,7 @@ bundle
100111
- Default value: `6379`.
101112
- `db` (integer) (optional): The db to use.
102113
- Default value: `0`.
103-
- `grace_period` (time) (optional): The grace period for last update.
114+
- `grace_period` (time) (optional): The grace period for last action update.
104115
- Default value: `300s`.
105116
- `flush_interval` (time) (optional): The flush interval to send metrics.
106117
- Default value: `300s`.

lib/fluent/plugin/in_http_cwm.rb

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class CwmHttpInput < Fluent::Plugin::Input
5353
desc 'The db to use.'
5454
config_param :db, :integer, default: 0
5555

56-
desc 'The grace period for last update.'
56+
desc 'The grace period for last action update.'
5757
config_param :grace_period, :time, default: '300s'
5858

5959
desc 'The flush interval to send metrics.'
@@ -71,6 +71,9 @@ def initialize
7171

7272
@redis = nil
7373
@deployment_api_metrics = default_api_metrics_hash
74+
75+
@last_action_queue = Queue.new
76+
@last_action_entry = []
7477
end
7578

7679
def configure(conf)
@@ -82,18 +85,29 @@ def configure(conf)
8285
def start
8386
super
8487

85-
# set up timer for flush interval
86-
timer_execute(:metrics_flush_timer, @redis_config.flush_interval) do
88+
# start interval timer to flush api metrics
89+
timer_execute(:api_metrics_flush_timer, @redis_config.flush_interval) do
8790
flush_api_metrics
8891
end
8992

93+
# start interval timer to flush last action entry
94+
timer_execute(:last_action_flush_timer, '1s') do
95+
if @last_action_entry.empty?
96+
@last_action_entry = @last_action_queue.deq.split('|')
97+
log.debug("Dequed last action entry. #{@last_action_entry}")
98+
else
99+
deploymentid, last_action = @last_action_entry
100+
@last_action_entry = [] if update_deployment_last_action(deploymentid, last_action)
101+
end
102+
end
103+
90104
log.info("Starting HTTP server [#{@host}:#{@port}]...")
91105
http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server|
92106
server.post("/#{tag}") do |req|
93107
data = parse_data(req.body)
94108
route(data) if update_deployment_metrics(data)
95109

96-
# return HTTP 200 OK response to MinIO
110+
# return HTTP 200 OK response with emtpy body
97111
[200, { 'Content-Type' => 'text/plain' }, nil]
98112
end
99113
end
@@ -141,40 +155,52 @@ def route(data)
141155
router.emit(@tag, time, record)
142156
end
143157

144-
def days_to_seconds(days)
145-
days * 24 * 60 * 60
158+
def datetime_diff_in_secs(dt_begin, dt_end)
159+
seconds = ((dt_end - dt_begin) * 24 * 60 * 60)
160+
seconds.to_i
146161
end
147162

148-
def update_deployment_last_action(deploymentid)
149-
log.debug('Updating deployment last action')
150-
163+
def update_deployment_last_action(deploymentid, last_action)
151164
key = "#{@redis_config.last_update_prefix}:#{deploymentid}"
152-
curdt = DateTime.now
153-
154-
begin
155-
log.debug("checking existing last action entry [key: #{key}]")
156-
lastval = @redis.get(key)
157-
158-
is_grace_period_expired = false
159-
if lastval
160-
lastdt = DateTime.parse(lastval, FMT_DATETIME)
161-
dt_diff_secs = days_to_seconds((curdt - lastdt).to_f)
162-
if dt_diff_secs > @redis_config.grace_period
163-
is_grace_period_expired = true
164-
log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]")
165-
end
166-
else
167-
log.debug('last action entry not found. going to be set for the first time.')
165+
log.debug("Checking existing last action entry [key: #{key}]")
166+
lastval = @redis.get(key)
167+
168+
is_grace_period_expired = false
169+
if lastval
170+
curdt = DateTime.now
171+
lastdt = DateTime.parse(lastval, FMT_DATETIME)
172+
dt_diff_secs = datetime_diff_in_secs(lastdt, curdt)
173+
log.debug("Current Data/Time: #{curdt}")
174+
log.debug("Previous Date/Time: #{lastdt}")
175+
log.debug("Date/Time diff (s): #{dt_diff_secs}")
176+
177+
if dt_diff_secs >= @redis_config.grace_period
178+
is_grace_period_expired = true
179+
log.debug("Grace period expired for last action update. [#{@redis_config.grace_period}]")
168180
end
181+
else
182+
log.debug('Last action entry does not exist. It will be set for the first time.')
183+
end
169184

170-
if lastdt.nil? || is_grace_period_expired
171-
last_action = curdt.strftime(FMT_DATETIME)
172-
@redis.set(key, last_action)
173-
log.debug("Updated last action entry [#{key} => #{last_action}]")
174-
end
175-
rescue StandardError => e
176-
log.error("Unable to update last action! ERROR: '#{e}'.")
185+
if lastdt.nil? || is_grace_period_expired
186+
log.debug('Updating deployment last action')
187+
last_action = DateTime.parse(last_action, FMT_DATETIME)
188+
@redis.set(key, last_action)
189+
log.debug("Updated last action entry [#{key} => #{last_action}]")
190+
true
191+
else
192+
false
177193
end
194+
rescue StandardError => e
195+
log.error("Unable to update last action! ERROR: '#{e}'.")
196+
false
197+
end
198+
199+
def enque_last_action_entry(deploymentid)
200+
last_action = DateTime.now
201+
entry = "#{deploymentid}|#{last_action}"
202+
@last_action_queue.enq(entry)
203+
log.debug("Enqued last action entry. [#{entry}]")
178204
end
179205

180206
def validate_and_get_value(data_hash, key)
@@ -191,7 +217,7 @@ def update_deployment_metrics(data)
191217
deploymentid = validate_and_get_value(data, 'deploymentid')
192218
return false unless deploymentid
193219

194-
update_deployment_last_action(deploymentid)
220+
enque_last_action_entry(deploymentid)
195221

196222
api_data = validate_and_get_value(data, 'api')
197223
return false unless api_data

0 commit comments

Comments
 (0)