In-place version upgrades for Apache Flink - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

In-place version upgrades for Apache Flink

With in-place version upgrades for Apache Flink, you retain application traceability against a single ARN across Apache Flink versions. This includes snapshots, logs, metrics, tags, Flink configurations, resource limit increases, VPCs, and more. You can perform in-place version upgrades for Apache Flink to upgrade existing applications to a new Flink version in Amazon Managed Service for Apache Flink. To perform this task, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console.

Note

You can't use in-place version upgrades for Apache Flink with Amazon Managed Service for Apache Flink Studio.

Upgrading applications using in-place version upgrades for Apache Flink

Before you begin, we recommend that you watch this video: In-Place Version Upgrades.

To perform in-place version upgrades for Apache Flink, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. You can use this feature with any existing applications that you use with Managed Service for Apache Flink in a READY or RUNNING state. It uses the UpdateApplication API to add the ability to change the Flink runtime.

Before upgrading: Updating your Apache Flink application

When you write your Flink applications, you bundle them with their dependencies into an application JAR and upload the JAR to your Amazon S3 bucket. From there, Amazon Managed Service for Apache Flink runs the job in the new Flink runtime that you've selected. You might have to update your applications to achieve compatibility with the Flink runtime you want to upgrade to. There can be inconsistencies between Flink versions that cause the version upgrade to fail. Most commonly, this will be with connectors for sources (ingress) or destinations (sinks, egress) and Scala dependencies. Flink 1.15 and later versions in Managed Service for Apache Flink are Scala-agnostic, and your JAR must contain the version of Scala you plan to use.

To update your application

  1. Read the advice from the Flink community on upgrading applications with state. See Upgrading Applications and Flink Versions.

  2. Read the list of knowing issues and limitations. See Precautions and known issues.

  3. Update your dependencies and test your applications locally. These dependencies typically are:

    1. The Flink runtime and API.

    2. Connectors recommended for the new Flink runtime. You can find these on Release versions for the specific runtime you want to update to.

    3. Scala – Apache Flink is Scala-agnostic starting with and including Flink 1.15. You must include the Scala dependencies you want to use in your application JAR.

  4. Build a new application JAR on zipfile and upload it to Amazon S3. We recommend that you use a different name from the previous JAR/zipfile. If you need to roll back, you will use this information.

  5. If you are running stateful applications, we strongly recommend that you take a snapshot of your current application. This lets you roll back statefully if you encounter issues during or after the upgrade.

Upgrading your application to a new Apache Flink version

You can upgrade your Flink application by using the UpdateApplication action.

You can call the UpdateApplication API in multiple ways:

  • Use the existing Configuration workflow on the AWS Management Console.

    • Go to your app page on the AWS Management Console.

    • Choose Configure.

    • Select the new runtime and the snapshot that you want to start from, also known as restore configuration. Use the latest setting as the restore configuration to start the app from the latest snapshot. Point to the new upgraded application JAR/zip on Amazon S3.

  • Use the AWS CLI update-application action.

  • Use AWS CloudFormation (CFN).

    • Update the RuntimeEnvironment field. Previously, AWS CloudFormation deleted the application and created a new one, causing your snapshots and other app history to be lost. Now AWS CloudFormation updates your RuntimeEnvironment in place and does not delete your application.

  • Use the AWS SDK.

    • Consult the SDK documentation for the programming language of your choice. See UpdateApplication.

You can perform the upgrade while the application is in RUNNING state or while the application is stopped in READY state. Amazon Managed Service for Apache Flink validates to verify the compatibility between the original runtime version and the target runtime version. This compatibility check runs when you perform UpdateApplication while in RUNNING state or at the next StartApplication if you upgrade while in READY state.

The following example shows upgrading an app in RUNNING state named UpgradeTest to Flink 1.18 in US East (N. Virginia) using the AWS CLI and starting the upgraded app from the latest snapshot.

aws --region us-east-1 kinesisanalyticsv2 update-application \ --application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \ --application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\ '{"CodeContentUpdate": {"S3ContentLocationUpdate": '\ '{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \ --run-configuration-update '{"ApplicationRestoreConfiguration": '\ '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \ --current-application-version-id ${current_application_version}
  • If you enabled service snapshots and want to continue the application from the latest snapshot, Amazon Managed Service for Apache Flink verifies that the current RUNNING application's runtime is compatible with the selected target runtime.

  • If you have specified a snapshot from which to continue the target runtime, Amazon Managed Service for Apache Flink verifies that the target runtime is compatible with the specified snapshot. If the compatibility check fails, your update request is rejected and your application remains untouched in the RUNNING state.

  • If you choose to start your application without a snapshot, Amazon Managed Service for Apache Flink doesn't run any compatibility checks.

  • If your upgraded application fails or gets stuck in a transitive UPDATING state, follow the instructions in the Rollback section to return to the healthy state.

Process flow for running state applications


                        The following diagram represents the recommended workflow to upgrade
                            the application while running. We assume that the application is
                            stateful and that you enabled snapshots. For this workflow, on update,
                            you restore the application from the latest snapshot that was
                            automatically taken by Amazon Managed Service for Apache Flink before updating.

The following example shows upgrading an app in READY state named UpgradeTest to Flink 1.18 in US East (N. Virginia) using the AWS CLI. There is no specified snapshot to start the app because the application is not running. You can specify a snapshot when you issue the start application request.

aws --region us-east-1 kinesisanalyticsv2 update-application \ --application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \ --application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\ '{"CodeContentUpdate": {"S3ContentLocationUpdate": '\ '{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \ --current-application-version-id ${current_application_version}
  • You can update the runtime of your applications in READY state to any Flink version. Amazon Managed Service for Apache Flink does not run any checks until you start your application.

  • Amazon Managed Service for Apache Flink only runs compatibility checks against the snapshot you selected to start the app. These are basic compatibility checks following the Flink Compatibility Table. They only check the Flink version with which the snapshot was taken and the Flink version you are targeting. If the Flink runtime of the selected snapshot is incompatible with the app's new runtime, the start request might be rejected.

Process flow for ready state applications


                        The following diagram represents the recommended workflow to upgrade
                            the application
                            while
                            in ready state. We assume that the application is
                            stateful and that you enabled snapshots. For this workflow, on update,
                            you restore the application from the latest snapshot that was
                            automatically taken by Amazon Managed Service for Apache Flink when the application was
                            stopped.

Rollback

If you have issues with your application or find inconsistencies in your application code between Flink versions, you can roll back using the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. The following examples show what rolling back looks like in different failure scenarios.

Runtime upgrade succeeded, the application is in RUNNING state, but the job is failing and continuously restarting

Assume you are trying to upgrade a stateful application named TestApplication from Flink 1.15 to Flink 1.18 in US East (N. Virginia). However, the upgraded Flink 1.18 application is failing to start or is constantly restarting, even though the application is in RUNNING state. This is a common failure scenario. To avoid further downtime, we recommend that you roll back your application immediately to the previous running version (Flink 1.15), and diagnose the issue later.

To roll back the application to the previous running version, use the rollback-application AWS CLI command or the RollbackApplication API action. This API action rolls back the changes you've made that resulted in the latest version. Then it restarts your application using the latest successful snapshot.

We strongly recommend that you take a snapshot with your existing app before you attempt to upgrade. This will help to avoid data loss or having to reprocess data.

In this failure scenario, AWS CloudFormation will not roll back the application for you. You must update the CloudFormation template to point to the previous runtime and to the previous code to force CloudFormation to update the application. Otherwise, CloudFormation assumes that your application has been updated when it transitions to the RUNNING state.

Rolling back an application that is stuck in UPDATING

If your application gets stuck in the UPDATING or AUTOSCALING state after an upgrade attempt, Amazon Managed Service for Apache Flink offers the rollback-applications AWS CLI command, or the RollbackApplications API action that can roll back the application to the version before the stuck UPDATING or AUTOSCALING state. This API rolls back the changes that you’ve made that caused the application to get stuck in UPDATING or AUTOSCALING transitive state.

General best practices and recommendations

  • Test the new job/runtime without state on a non-production environment before attempting a production upgrade.

  • Consider testing the stateful upgrade with a non-production application first.

  • Make sure that your new job graph has a compatible state with the snapshot you will be using to start your upgraded application.

    • Make sure that the types stored in operator states stay the same. If the type has changed, Apache Flink can't restore the operator state.

    • Make sure that the Operator IDs you set using the uid method remain the same. Apache Flink has a strong recommendation for assigning unique IDs to operators. For more information, see Assigning Operator IDs in the Apache Flink documentation.

      If you don't assign IDs to your operators, Flink automatically generates them. In that case, they might depend on the program structure and, if changed, can cause compatibility issues. Flink uses Operator IDs to match state in snapshot to operator. Changing Operator IDs results in the application not starting, or state stored in the snapshot being dropped, and the new operator starting without state.

    • Don't change the key used to store the keyed state.

    • Don't modify the input type of stateful operators like window or join. This implicitly changes the type of the internal state of the operator, causing a state incompatibility.

Precautions and known issues

Known limitations of state compatibility

  • If you are using the Table API, Apache Flink doesn't guarantee state compatibility between Flink versions. For more information, see Stateful Upgrades and Evolution in the Apache Flink documentation.

  • Flink 1.6 states are not compatible with Flink 1.18. The API rejects your request if you try to upgrade from 1.6 to 1.18 and later with state. You can upgrade to 1.8, 1.11, 1.13 and 1.15 and take a snapshot, and then upgrade to 1.18 and later. For more information, see Upgrading Applications and Flink Versions in the Apache Flink documentation.

Known issues with the Flink Kinesis Connector

  • If you are using Flink 1.11 or earlier and using the amazon-kinesis-connector-flink connector for Enhanced-fan-out (EFO) support, you must take extra steps for a stateful upgrade to Flink 1.13 or later. This is because of the change in the package name of the connector. For more information, see amazon-kinesis-connector-flink.

    The amazon-kinesis-connector-flink connector for Flink 1.11 and earlier uses the packaging software.amazon.kinesis, whereas the Kinesis connector for Flink 1.13 and later uses org.apache.flink.streaming.connectors.kinesis. Use this tool to support your migration: amazon-kinesis-connector-flink-state-migrator.

  • If you are using Flink 1.13 or earlier with FlinkKinesisProducer and upgrading to Flink 1.15 or later, for a stateful upgrade you must continue to use FlinkKinesisProducer in Flink 1.15 or later, instead of the newer KinesisStreamsSink. However, if you already have a custom uid set on your sink, you should be able to switch to KinesisStreamsSink because FlinkKinesisProducer doesn't keep state. Flink will treat it as the same operator because a custom uid is set.

Flink applications written in Scala

  • As of Flink 1.15, Apache Flink doesn't include Scala in the runtime. You must include the version of Scala you want to use and other Scala dependencies in your code JAR/zip when upgrading to Flink 1.15 or later. For more information, see Amazon Managed Service for Apache Flink for Apache Flink 1.15.2 release.

  • If your application uses Scala and you are upgrading it from Flink 1.11 or earlier (Scala 2.11) to Flink 1.13 (Scala 2.12), make sure that your code uses Scala 2.12. Otherwise, your Flink 1.13 application may fail to find Scala 2.11 classes in the Flink 1.13 runtime.

Things to consider when downgrading Flink application

  • Downgrading Flink applications is possible, but limited to cases when the application was previously running with the older Flink version. For a stateful upgrade Managed Service for Apache Flink will require using a snapshot taken with matching or earlier version for the downgrade

  • If you are updating your runtime from Flink 1.13 or later to Flink 1.11 or earlier, and if your app uses the HashMap state backend, your application will continuously fail.